You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by cw...@apache.org on 2021/01/17 03:53:56 UTC
[druid] branch master updated: Historical unloads damaged segments
automatically when lazy on start. (#10688)
This is an automated email from the ASF dual-hosted git repository.
cwylie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 2590ad4 Historical unloads damaged segments automatically when lazy on start. (#10688)
2590ad4 is described below
commit 2590ad4f67fab99760eb4562d84b34e8aa55f6c1
Author: zhangyue19921010 <69...@users.noreply.github.com>
AuthorDate: Sun Jan 17 11:53:30 2021 +0800
Historical unloads damaged segments automatically when lazy on start. (#10688)
* ready to test
* tested on dev cluster
* tested
* code review
* add UTs
* add UTs
* ut passed
* ut passed
* opti imports
* done
* done
* fix checkstyle
* modify uts
* modify logs
* changing the package of SegmentLazyLoadFailCallback.java to org.apache.druid.segment
* merge from master
* modify import orders
* merge from master
* merge from master
* modify logs
* modify docs
* modify logs to rerun ci
* modify logs to rerun ci
* modify logs to rerun ci
* modify logs to rerun ci
* modify logs to rerun ci
* modify logs to rerun ci
* modify logs to rerun ci
* modify logs to rerun ci
Co-authored-by: yuezhang <yu...@freewheel.tv>
---
.travis.yml | 2 +-
.../AbstractMultiPhaseParallelIndexingTest.java | 3 +-
.../java/org/apache/druid/segment/IndexIO.java | 21 ++++---
.../org/apache/druid/segment/IndexMergerV9.java | 2 +-
.../druid/segment/SegmentLazyLoadFailCallback.java | 19 ++----
...JoinableMMappedQueryableSegmentizerFactory.java | 5 +-
.../MMappedQueryableSegmentizerFactory.java | 5 +-
.../druid/segment/loading/SegmentizerFactory.java | 3 +-
.../segment/CustomSegmentizerFactoryTest.java | 4 +-
.../java/org/apache/druid/segment/IndexIOTest.java | 65 +++++++++++++++++++++
.../table/BroadcastSegmentIndexedTableTest.java | 3 +-
...ableMMappedQueryableSegmentizerFactoryTest.java | 3 +-
.../segmentWithDamagedFile/00000.smoosh | Bin 0 -> 1588 bytes
.../segmentWithDamagedFile/factory.json | 1 +
.../segmentWithDamagedFile/meta.smoosh | 9 +++
.../segmentWithDamagedFile/version.bin | Bin 0 -> 4 bytes
.../druid/segment/loading/SegmentLoader.java | 3 +-
.../loading/SegmentLoaderLocalCacheManager.java | 5 +-
.../org/apache/druid/server/SegmentManager.java | 10 ++--
.../coordination/SegmentLoadDropHandler.java | 5 +-
.../segment/loading/CacheTestSegmentLoader.java | 3 +-
...egmentManagerBroadcastJoinIndexedTableTest.java | 11 ++--
.../apache/druid/server/SegmentManagerTest.java | 29 ++++-----
.../server/SegmentManagerThreadSafetyTest.java | 7 ++-
.../coordination/SegmentLoadDropHandlerTest.java | 2 +-
.../server/coordination/ServerManagerTest.java | 6 +-
26 files changed, 157 insertions(+), 69 deletions(-)
diff --git a/.travis.yml b/.travis.yml
index a25ea08..aa03eee 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -154,7 +154,7 @@ jobs:
# Set MAVEN_OPTS for Surefire launcher. Skip remoteresources to avoid intermittent connection timeouts when
# resolving the SIGAR dependency.
- >
- MAVEN_OPTS='-Xmx800m' ${MVN} test -pl ${MAVEN_PROJECTS}
+ MAVEN_OPTS='-Xmx1100m' ${MVN} test -pl ${MAVEN_PROJECTS}
${MAVEN_SKIP} -Dremoteresources.skip=true -Ddruid.generic.useDefaultValueForNull=${DRUID_USE_DEFAULT_VALUE_FOR_NULL}
- sh -c "dmesg | egrep -i '(oom|out of memory|kill process|killed).*' -C 1 || exit 0"
- free -m
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractMultiPhaseParallelIndexingTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractMultiPhaseParallelIndexingTest.java
index b57cede..d06d17c 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractMultiPhaseParallelIndexingTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractMultiPhaseParallelIndexingTest.java
@@ -51,6 +51,7 @@ import org.apache.druid.query.scan.ScanQueryRunnerFactory;
import org.apache.druid.query.scan.ScanResultValue;
import org.apache.druid.query.spec.SpecificSegmentSpec;
import org.apache.druid.segment.Segment;
+import org.apache.druid.segment.SegmentLazyLoadFailCallback;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.granularity.GranularitySpec;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
@@ -282,7 +283,7 @@ abstract class AbstractMultiPhaseParallelIndexingTest extends AbstractParallelIn
final SegmentLoader loader = new SegmentLoaderFactory(getIndexIO(), getObjectMapper())
.manufacturate(tempSegmentDir);
try {
- return loader.getSegment(dataSegment, false);
+ return loader.getSegment(dataSegment, false, SegmentLazyLoadFailCallback.NOOP);
}
catch (SegmentLoadingException e) {
throw new RuntimeException(e);
diff --git a/processing/src/main/java/org/apache/druid/segment/IndexIO.java b/processing/src/main/java/org/apache/druid/segment/IndexIO.java
index 6558072..df43f63 100644
--- a/processing/src/main/java/org/apache/druid/segment/IndexIO.java
+++ b/processing/src/main/java/org/apache/druid/segment/IndexIO.java
@@ -182,16 +182,17 @@ public class IndexIO
public QueryableIndex loadIndex(File inDir) throws IOException
{
- return loadIndex(inDir, false);
+ return loadIndex(inDir, false, SegmentLazyLoadFailCallback.NOOP);
}
- public QueryableIndex loadIndex(File inDir, boolean lazy) throws IOException
+
+ public QueryableIndex loadIndex(File inDir, boolean lazy, SegmentLazyLoadFailCallback loadFailed) throws IOException
{
final int version = SegmentUtils.getVersionFromDir(inDir);
final IndexLoader loader = indexLoaders.get(version);
if (loader != null) {
- return loader.load(inDir, mapper, lazy);
+ return loader.load(inDir, mapper, lazy, loadFailed);
} else {
throw new ISE("Unknown index version[%s]", version);
}
@@ -412,7 +413,7 @@ public class IndexIO
interface IndexLoader
{
- QueryableIndex load(File inDir, ObjectMapper mapper, boolean lazy) throws IOException;
+ QueryableIndex load(File inDir, ObjectMapper mapper, boolean lazy, SegmentLazyLoadFailCallback loadFailed) throws IOException;
}
static class LegacyIndexLoader implements IndexLoader
@@ -427,7 +428,7 @@ public class IndexIO
}
@Override
- public QueryableIndex load(File inDir, ObjectMapper mapper, boolean lazy) throws IOException
+ public QueryableIndex load(File inDir, ObjectMapper mapper, boolean lazy, SegmentLazyLoadFailCallback loadFailed) throws IOException
{
MMappedIndex index = legacyHandler.mapDir(inDir);
@@ -522,7 +523,7 @@ public class IndexIO
}
@Override
- public QueryableIndex load(File inDir, ObjectMapper mapper, boolean lazy) throws IOException
+ public QueryableIndex load(File inDir, ObjectMapper mapper, boolean lazy, SegmentLazyLoadFailCallback loadFailed) throws IOException
{
log.debug("Mapping v9 index[%s]", inDir);
long startTime = System.currentTimeMillis();
@@ -598,7 +599,9 @@ public class IndexIO
try {
return deserializeColumn(mapper, colBuffer, smooshedFiles);
}
- catch (IOException e) {
+ catch (IOException | RuntimeException e) {
+ log.warn(e, "Throw exceptions when deserialize column [%s].", columnName);
+ loadFailed.execute();
throw Throwables.propagate(e);
}
}
@@ -618,7 +621,9 @@ public class IndexIO
try {
return deserializeColumn(mapper, timeBuffer, smooshedFiles);
}
- catch (IOException e) {
+ catch (IOException | RuntimeException e) {
+ log.warn(e, "Throw exceptions when deserialize column [%s]", ColumnHolder.TIME_COLUMN_NAME);
+ loadFailed.execute();
throw Throwables.propagate(e);
}
}
diff --git a/processing/src/main/java/org/apache/druid/segment/IndexMergerV9.java b/processing/src/main/java/org/apache/druid/segment/IndexMergerV9.java
index cb65289..02618da 100644
--- a/processing/src/main/java/org/apache/druid/segment/IndexMergerV9.java
+++ b/processing/src/main/java/org/apache/druid/segment/IndexMergerV9.java
@@ -1011,7 +1011,7 @@ public class IndexMergerV9 implements IndexMerger
// convert Files to QueryableIndexIndexableAdapter and do another merge phase
List<IndexableAdapter> qIndexAdapters = new ArrayList<>();
for (File outputFile : currentOutputs) {
- QueryableIndex qIndex = indexIO.loadIndex(outputFile, true);
+ QueryableIndex qIndex = indexIO.loadIndex(outputFile, true, SegmentLazyLoadFailCallback.NOOP);
qIndexAdapters.add(new QueryableIndexIndexableAdapter(qIndex));
}
currentPhases = getMergePhases(qIndexAdapters, maxColumnsToMerge);
diff --git a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoader.java b/processing/src/main/java/org/apache/druid/segment/SegmentLazyLoadFailCallback.java
similarity index 60%
copy from server/src/main/java/org/apache/druid/segment/loading/SegmentLoader.java
copy to processing/src/main/java/org/apache/druid/segment/SegmentLazyLoadFailCallback.java
index f63024a..babc6ba 100644
--- a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoader.java
+++ b/processing/src/main/java/org/apache/druid/segment/SegmentLazyLoadFailCallback.java
@@ -17,21 +17,10 @@
* under the License.
*/
-package org.apache.druid.segment.loading;
+package org.apache.druid.segment;
-import org.apache.druid.segment.Segment;
-import org.apache.druid.timeline.DataSegment;
-
-import java.io.File;
-
-/**
- * Loading segments from deep storage to local storage.
- * Implementations must be thread-safe.
- */
-public interface SegmentLoader
+public interface SegmentLazyLoadFailCallback
{
- boolean isSegmentLoaded(DataSegment segment);
- Segment getSegment(DataSegment segment, boolean lazy) throws SegmentLoadingException;
- File getSegmentFiles(DataSegment segment) throws SegmentLoadingException;
- void cleanup(DataSegment segment);
+ void execute();
+ SegmentLazyLoadFailCallback NOOP = () -> {};
}
diff --git a/processing/src/main/java/org/apache/druid/segment/loading/BroadcastJoinableMMappedQueryableSegmentizerFactory.java b/processing/src/main/java/org/apache/druid/segment/loading/BroadcastJoinableMMappedQueryableSegmentizerFactory.java
index c675ea5..75f2882 100644
--- a/processing/src/main/java/org/apache/druid/segment/loading/BroadcastJoinableMMappedQueryableSegmentizerFactory.java
+++ b/processing/src/main/java/org/apache/druid/segment/loading/BroadcastJoinableMMappedQueryableSegmentizerFactory.java
@@ -25,6 +25,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.QueryableIndexSegment;
import org.apache.druid.segment.Segment;
+import org.apache.druid.segment.SegmentLazyLoadFailCallback;
import org.apache.druid.segment.join.table.BroadcastSegmentIndexedTable;
import org.apache.druid.segment.join.table.IndexedTable;
import org.apache.druid.timeline.DataSegment;
@@ -57,10 +58,10 @@ public class BroadcastJoinableMMappedQueryableSegmentizerFactory implements Segm
}
@Override
- public Segment factorize(DataSegment dataSegment, File parentDir, boolean lazy) throws SegmentLoadingException
+ public Segment factorize(DataSegment dataSegment, File parentDir, boolean lazy, SegmentLazyLoadFailCallback loadFailed) throws SegmentLoadingException
{
try {
- return new QueryableIndexSegment(indexIO.loadIndex(parentDir, lazy), dataSegment.getId()) {
+ return new QueryableIndexSegment(indexIO.loadIndex(parentDir, lazy, loadFailed), dataSegment.getId()) {
@Nullable
@Override
public <T> T as(Class<T> clazz)
diff --git a/processing/src/main/java/org/apache/druid/segment/loading/MMappedQueryableSegmentizerFactory.java b/processing/src/main/java/org/apache/druid/segment/loading/MMappedQueryableSegmentizerFactory.java
index ea5290e..d3124cb 100644
--- a/processing/src/main/java/org/apache/druid/segment/loading/MMappedQueryableSegmentizerFactory.java
+++ b/processing/src/main/java/org/apache/druid/segment/loading/MMappedQueryableSegmentizerFactory.java
@@ -24,6 +24,7 @@ import com.google.common.base.Preconditions;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.QueryableIndexSegment;
import org.apache.druid.segment.Segment;
+import org.apache.druid.segment.SegmentLazyLoadFailCallback;
import org.apache.druid.timeline.DataSegment;
import java.io.File;
@@ -42,10 +43,10 @@ public class MMappedQueryableSegmentizerFactory implements SegmentizerFactory
}
@Override
- public Segment factorize(DataSegment dataSegment, File parentDir, boolean lazy) throws SegmentLoadingException
+ public Segment factorize(DataSegment dataSegment, File parentDir, boolean lazy, SegmentLazyLoadFailCallback loadFailed) throws SegmentLoadingException
{
try {
- return new QueryableIndexSegment(indexIO.loadIndex(parentDir, lazy), dataSegment.getId());
+ return new QueryableIndexSegment(indexIO.loadIndex(parentDir, lazy, loadFailed), dataSegment.getId());
}
catch (IOException e) {
throw new SegmentLoadingException(e, "%s", e.getMessage());
diff --git a/processing/src/main/java/org/apache/druid/segment/loading/SegmentizerFactory.java b/processing/src/main/java/org/apache/druid/segment/loading/SegmentizerFactory.java
index 09bc048..25df3c5 100644
--- a/processing/src/main/java/org/apache/druid/segment/loading/SegmentizerFactory.java
+++ b/processing/src/main/java/org/apache/druid/segment/loading/SegmentizerFactory.java
@@ -21,6 +21,7 @@ package org.apache.druid.segment.loading;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import org.apache.druid.segment.Segment;
+import org.apache.druid.segment.SegmentLazyLoadFailCallback;
import org.apache.druid.timeline.DataSegment;
import java.io.File;
@@ -31,5 +32,5 @@ import java.io.File;
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = MMappedQueryableSegmentizerFactory.class)
public interface SegmentizerFactory
{
- Segment factorize(DataSegment segment, File parentDir, boolean lazy) throws SegmentLoadingException;
+ Segment factorize(DataSegment segment, File parentDir, boolean lazy, SegmentLazyLoadFailCallback loadFailed) throws SegmentLoadingException;
}
diff --git a/processing/src/test/java/org/apache/druid/segment/CustomSegmentizerFactoryTest.java b/processing/src/test/java/org/apache/druid/segment/CustomSegmentizerFactoryTest.java
index 8b4a8d3..9728f23 100644
--- a/processing/src/test/java/org/apache/druid/segment/CustomSegmentizerFactoryTest.java
+++ b/processing/src/test/java/org/apache/druid/segment/CustomSegmentizerFactoryTest.java
@@ -126,10 +126,10 @@ public class CustomSegmentizerFactoryTest extends InitializedNullHandlingTest
private static class CustomSegmentizerFactory implements SegmentizerFactory
{
@Override
- public Segment factorize(DataSegment segment, File parentDir, boolean lazy) throws SegmentLoadingException
+ public Segment factorize(DataSegment segment, File parentDir, boolean lazy, SegmentLazyLoadFailCallback loadFailed) throws SegmentLoadingException
{
try {
- return new QueryableIndexSegment(INDEX_IO.loadIndex(parentDir, lazy), segment.getId());
+ return new QueryableIndexSegment(INDEX_IO.loadIndex(parentDir, lazy, loadFailed), segment.getId());
}
catch (IOException e) {
throw new SegmentLoadingException(e, "%s", e.getMessage());
diff --git a/processing/src/test/java/org/apache/druid/segment/IndexIOTest.java b/processing/src/test/java/org/apache/druid/segment/IndexIOTest.java
index 5066bef..b6a9155 100644
--- a/processing/src/test/java/org/apache/druid/segment/IndexIOTest.java
+++ b/processing/src/test/java/org/apache/druid/segment/IndexIOTest.java
@@ -19,6 +19,7 @@
package org.apache.druid.segment;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.collect.Collections2;
import com.google.common.collect.ImmutableList;
@@ -29,6 +30,7 @@ import com.google.common.collect.Maps;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.data.input.MapBasedInputRow;
import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.UOE;
import org.apache.druid.query.aggregation.Aggregator;
@@ -50,6 +52,7 @@ import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import javax.annotation.Nullable;
+import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.BitSet;
@@ -338,4 +341,66 @@ public class IndexIOTest extends InitializedNullHandlingTest
}
}
}
+
+ @Test
+ public void testLoadSegmentDamagedFileWithLazy()
+ {
+ final ObjectMapper mapper = new DefaultObjectMapper();
+ final IndexIO indexIO = new IndexIO(mapper, () -> 0);
+ String path = this.getClass().getClassLoader().getResource("v9SegmentPersistDir/segmentWithDamagedFile/").getPath();
+
+ ForkSegmentLoadDropHandler segmentLoadDropHandler = new ForkSegmentLoadDropHandler();
+ ForkSegment segment = new ForkSegment(true);
+ Assert.assertTrue(segment.getSegmentExist());
+ File inDir = new File(path);
+ Exception e = null;
+
+ try {
+ QueryableIndex queryableIndex = indexIO.loadIndex(inDir, true, () -> segmentLoadDropHandler.removeSegment(segment));
+ Assert.assertNotNull(queryableIndex);
+ queryableIndex.getDimensionHandlers();
+ List<String> columnNames = queryableIndex.getColumnNames();
+ for (String columnName : columnNames) {
+ queryableIndex.getColumnHolder(columnName).toString();
+ }
+ }
+ catch (Exception ex) {
+ // Do nothing. Can ignore exceptions here.
+ e = ex;
+ }
+ Assert.assertNotNull(e);
+ Assert.assertFalse(segment.getSegmentExist());
+
+ }
+
+ private static class ForkSegmentLoadDropHandler
+ {
+ public void addSegment()
+ {
+ }
+ public void removeSegment(ForkSegment segment)
+ {
+ segment.setSegmentExist(false);
+ }
+ }
+
+ private static class ForkSegment
+ {
+ private Boolean segmentExist;
+
+ ForkSegment(Boolean segmentExist)
+ {
+ this.segmentExist = segmentExist;
+ }
+
+ void setSegmentExist(Boolean value)
+ {
+ this.segmentExist = value;
+ }
+
+ Boolean getSegmentExist()
+ {
+ return this.segmentExist;
+ }
+ }
}
diff --git a/processing/src/test/java/org/apache/druid/segment/join/table/BroadcastSegmentIndexedTableTest.java b/processing/src/test/java/org/apache/druid/segment/join/table/BroadcastSegmentIndexedTableTest.java
index 524f2ac..4635a7b 100644
--- a/processing/src/test/java/org/apache/druid/segment/join/table/BroadcastSegmentIndexedTableTest.java
+++ b/processing/src/test/java/org/apache/druid/segment/join/table/BroadcastSegmentIndexedTableTest.java
@@ -42,6 +42,7 @@ import org.apache.druid.segment.IndexMerger;
import org.apache.druid.segment.IndexMergerV9;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.QueryableIndexSegment;
+import org.apache.druid.segment.SegmentLazyLoadFailCallback;
import org.apache.druid.segment.SimpleAscendingOffset;
import org.apache.druid.segment.TestIndex;
import org.apache.druid.segment.column.BaseColumn;
@@ -137,7 +138,7 @@ public class BroadcastSegmentIndexedTableTest extends InitializedNullHandlingTes
null,
segment.getTotalSpace()
);
- backingSegment = (QueryableIndexSegment) factory.factorize(dataSegment, segment, false);
+ backingSegment = (QueryableIndexSegment) factory.factorize(dataSegment, segment, false, SegmentLazyLoadFailCallback.NOOP);
columnNames = ImmutableList.<String>builder().add(ColumnHolder.TIME_COLUMN_NAME)
.addAll(backingSegment.asQueryableIndex().getColumnNames()).build();
diff --git a/processing/src/test/java/org/apache/druid/segment/loading/BroadcastJoinableMMappedQueryableSegmentizerFactoryTest.java b/processing/src/test/java/org/apache/druid/segment/loading/BroadcastJoinableMMappedQueryableSegmentizerFactoryTest.java
index ab4c453..0e8ac93 100644
--- a/processing/src/test/java/org/apache/druid/segment/loading/BroadcastJoinableMMappedQueryableSegmentizerFactoryTest.java
+++ b/processing/src/test/java/org/apache/druid/segment/loading/BroadcastJoinableMMappedQueryableSegmentizerFactoryTest.java
@@ -35,6 +35,7 @@ import org.apache.druid.segment.IndexMerger;
import org.apache.druid.segment.IndexMergerV9;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.Segment;
+import org.apache.druid.segment.SegmentLazyLoadFailCallback;
import org.apache.druid.segment.TestIndex;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.join.table.BroadcastSegmentIndexedTable;
@@ -119,7 +120,7 @@ public class BroadcastJoinableMMappedQueryableSegmentizerFactoryTest extends Ini
null,
persistedSegmentRoot.getTotalSpace()
);
- final Segment loaded = factory.factorize(dataSegment, persistedSegmentRoot, false);
+ final Segment loaded = factory.factorize(dataSegment, persistedSegmentRoot, false, SegmentLazyLoadFailCallback.NOOP);
final BroadcastSegmentIndexedTable table = (BroadcastSegmentIndexedTable) loaded.as(IndexedTable.class);
Assert.assertNotNull(table);
diff --git a/processing/src/test/resources/v9SegmentPersistDir/segmentWithDamagedFile/00000.smoosh b/processing/src/test/resources/v9SegmentPersistDir/segmentWithDamagedFile/00000.smoosh
new file mode 100644
index 0000000..aaf3449
Binary files /dev/null and b/processing/src/test/resources/v9SegmentPersistDir/segmentWithDamagedFile/00000.smoosh differ
diff --git a/processing/src/test/resources/v9SegmentPersistDir/segmentWithDamagedFile/factory.json b/processing/src/test/resources/v9SegmentPersistDir/segmentWithDamagedFile/factory.json
new file mode 100644
index 0000000..f7b2cc3
--- /dev/null
+++ b/processing/src/test/resources/v9SegmentPersistDir/segmentWithDamagedFile/factory.json
@@ -0,0 +1 @@
+{"type":"mMapSegmentFactory"}
\ No newline at end of file
diff --git a/processing/src/test/resources/v9SegmentPersistDir/segmentWithDamagedFile/meta.smoosh b/processing/src/test/resources/v9SegmentPersistDir/segmentWithDamagedFile/meta.smoosh
new file mode 100644
index 0000000..70a831f
--- /dev/null
+++ b/processing/src/test/resources/v9SegmentPersistDir/segmentWithDamagedFile/meta.smoosh
@@ -0,0 +1,9 @@
+v1,2147483647,1
+__time,0,0,141
+count,0,141,282
+dstIP,0,564,805
+index.drd,0,1046,1205
+metadata.drd,0,1205,1587
+srcIP,0,805,1046
+sum_bytes,0,282,423
+sum_packets,0,423,564
diff --git a/processing/src/test/resources/v9SegmentPersistDir/segmentWithDamagedFile/version.bin b/processing/src/test/resources/v9SegmentPersistDir/segmentWithDamagedFile/version.bin
new file mode 100644
index 0000000..3dd5ace
Binary files /dev/null and b/processing/src/test/resources/v9SegmentPersistDir/segmentWithDamagedFile/version.bin differ
diff --git a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoader.java b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoader.java
index f63024a..741cfa1 100644
--- a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoader.java
+++ b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoader.java
@@ -20,6 +20,7 @@
package org.apache.druid.segment.loading;
import org.apache.druid.segment.Segment;
+import org.apache.druid.segment.SegmentLazyLoadFailCallback;
import org.apache.druid.timeline.DataSegment;
import java.io.File;
@@ -31,7 +32,7 @@ import java.io.File;
public interface SegmentLoader
{
boolean isSegmentLoaded(DataSegment segment);
- Segment getSegment(DataSegment segment, boolean lazy) throws SegmentLoadingException;
+ Segment getSegment(DataSegment segment, boolean lazy, SegmentLazyLoadFailCallback loadFailed) throws SegmentLoadingException;
File getSegmentFiles(DataSegment segment) throws SegmentLoadingException;
void cleanup(DataSegment segment);
}
diff --git a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java
index 16bf499..cd4d3fd8 100644
--- a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java
+++ b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java
@@ -28,6 +28,7 @@ import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.Segment;
+import org.apache.druid.segment.SegmentLazyLoadFailCallback;
import org.apache.druid.timeline.DataSegment;
import javax.annotation.Nonnull;
@@ -177,7 +178,7 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader
}
@Override
- public Segment getSegment(DataSegment segment, boolean lazy) throws SegmentLoadingException
+ public Segment getSegment(DataSegment segment, boolean lazy, SegmentLazyLoadFailCallback loadFailed) throws SegmentLoadingException
{
final ReferenceCountingLock lock = createOrGetLock(segment);
final File segmentFiles;
@@ -203,7 +204,7 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader
factory = new MMappedQueryableSegmentizerFactory(indexIO);
}
- return factory.factorize(segment, segmentFiles, lazy);
+ return factory.factorize(segment, segmentFiles, lazy, loadFailed);
}
/**
diff --git a/server/src/main/java/org/apache/druid/server/SegmentManager.java b/server/src/main/java/org/apache/druid/server/SegmentManager.java
index 3b3d891..c3636d3 100644
--- a/server/src/main/java/org/apache/druid/server/SegmentManager.java
+++ b/server/src/main/java/org/apache/druid/server/SegmentManager.java
@@ -31,6 +31,7 @@ import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.segment.ReferenceCountingSegment;
import org.apache.druid.segment.Segment;
+import org.apache.druid.segment.SegmentLazyLoadFailCallback;
import org.apache.druid.segment.join.table.IndexedTable;
import org.apache.druid.segment.join.table.ReferenceCountingIndexedTable;
import org.apache.druid.segment.loading.SegmentLoader;
@@ -214,14 +215,15 @@ public class SegmentManager
*
* @param segment segment to load
* @param lazy whether to lazy load columns metadata
+ * @param loadFailed callBack to execute when segment lazy load failed
*
* @return true if the segment was newly loaded, false if it was already loaded
*
* @throws SegmentLoadingException if the segment cannot be loaded
*/
- public boolean loadSegment(final DataSegment segment, boolean lazy) throws SegmentLoadingException
+ public boolean loadSegment(final DataSegment segment, boolean lazy, SegmentLazyLoadFailCallback loadFailed) throws SegmentLoadingException
{
- final Segment adapter = getAdapter(segment, lazy);
+ final Segment adapter = getAdapter(segment, lazy, loadFailed);
final SettableSupplier<Boolean> resultSupplier = new SettableSupplier<>();
@@ -271,11 +273,11 @@ public class SegmentManager
return resultSupplier.get();
}
- private Segment getAdapter(final DataSegment segment, boolean lazy) throws SegmentLoadingException
+ private Segment getAdapter(final DataSegment segment, boolean lazy, SegmentLazyLoadFailCallback loadFailed) throws SegmentLoadingException
{
final Segment adapter;
try {
- adapter = segmentLoader.getSegment(segment, lazy);
+ adapter = segmentLoader.getSegment(segment, lazy, loadFailed);
}
catch (SegmentLoadingException e) {
segmentLoader.cleanup(segment);
diff --git a/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java b/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java
index 0cb0ec0..dcc0309 100644
--- a/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java
+++ b/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java
@@ -269,7 +269,10 @@ public class SegmentLoadDropHandler implements DataSegmentChangeHandler
{
final boolean loaded;
try {
- loaded = segmentManager.loadSegment(segment, lazy);
+ loaded = segmentManager.loadSegment(segment,
+ lazy,
+ () -> this.removeSegment(segment, DataSegmentChangeCallback.NOOP, false)
+ );
}
catch (Exception e) {
removeSegment(segment, callback, false);
diff --git a/server/src/test/java/org/apache/druid/segment/loading/CacheTestSegmentLoader.java b/server/src/test/java/org/apache/druid/segment/loading/CacheTestSegmentLoader.java
index a42c553..557537c 100644
--- a/server/src/test/java/org/apache/druid/segment/loading/CacheTestSegmentLoader.java
+++ b/server/src/test/java/org/apache/druid/segment/loading/CacheTestSegmentLoader.java
@@ -22,6 +22,7 @@ package org.apache.druid.segment.loading;
import org.apache.druid.java.util.common.MapUtils;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.Segment;
+import org.apache.druid.segment.SegmentLazyLoadFailCallback;
import org.apache.druid.segment.StorageAdapter;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
@@ -46,7 +47,7 @@ public class CacheTestSegmentLoader implements SegmentLoader
}
@Override
- public Segment getSegment(final DataSegment segment, boolean lazy)
+ public Segment getSegment(final DataSegment segment, boolean lazy, SegmentLazyLoadFailCallback SegmentLazyLoadFailCallback)
{
return new Segment()
{
diff --git a/server/src/test/java/org/apache/druid/server/SegmentManagerBroadcastJoinIndexedTableTest.java b/server/src/test/java/org/apache/druid/server/SegmentManagerBroadcastJoinIndexedTableTest.java
index 3bb8768..38c4100 100644
--- a/server/src/test/java/org/apache/druid/server/SegmentManagerBroadcastJoinIndexedTableTest.java
+++ b/server/src/test/java/org/apache/druid/server/SegmentManagerBroadcastJoinIndexedTableTest.java
@@ -41,6 +41,7 @@ import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.IndexMerger;
import org.apache.druid.segment.IndexMergerV9;
import org.apache.druid.segment.IndexSpec;
+import org.apache.druid.segment.SegmentLazyLoadFailCallback;
import org.apache.druid.segment.TestIndex;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.join.BroadcastTableJoinableFactory;
@@ -159,7 +160,7 @@ public class SegmentManagerBroadcastJoinIndexedTableTest extends InitializedNull
IncrementalIndex data = TestIndex.makeRealtimeIndex("druid.sample.numeric.tsv");
final String interval = "2011-01-12T00:00:00.000Z/2011-05-01T00:00:00.000Z";
DataSegment segment = createSegment(data, interval, version);
- Assert.assertTrue(segmentManager.loadSegment(segment, false));
+ Assert.assertTrue(segmentManager.loadSegment(segment, false, SegmentLazyLoadFailCallback.NOOP));
Assert.assertTrue(joinableFactory.isDirectlyJoinable(dataSource));
Optional<Joinable> maybeJoinable = makeJoinable(dataSource);
@@ -208,8 +209,8 @@ public class SegmentManagerBroadcastJoinIndexedTableTest extends InitializedNull
IncrementalIndex data2 = TestIndex.makeRealtimeIndex("druid.sample.numeric.tsv.bottom");
DataSegment segment1 = createSegment(data, interval, version);
DataSegment segment2 = createSegment(data2, interval2, version2);
- Assert.assertTrue(segmentManager.loadSegment(segment1, false));
- Assert.assertTrue(segmentManager.loadSegment(segment2, false));
+ Assert.assertTrue(segmentManager.loadSegment(segment1, false, SegmentLazyLoadFailCallback.NOOP));
+ Assert.assertTrue(segmentManager.loadSegment(segment2, false, SegmentLazyLoadFailCallback.NOOP));
Assert.assertTrue(joinableFactory.isDirectlyJoinable(dataSource));
Optional<Joinable> maybeJoinable = makeJoinable(dataSource);
@@ -271,7 +272,7 @@ public class SegmentManagerBroadcastJoinIndexedTableTest extends InitializedNull
final String interval2 = "2011-01-12T00:00:00.000Z/2011-03-28T00:00:00.000Z";
IncrementalIndex data = TestIndex.makeRealtimeIndex("druid.sample.numeric.tsv.bottom");
IncrementalIndex data2 = TestIndex.makeRealtimeIndex("druid.sample.numeric.tsv.top");
- Assert.assertTrue(segmentManager.loadSegment(createSegment(data, interval, version), false));
+ Assert.assertTrue(segmentManager.loadSegment(createSegment(data, interval, version), false, SegmentLazyLoadFailCallback.NOOP));
Assert.assertTrue(joinableFactory.isDirectlyJoinable(dataSource));
Optional<Joinable> maybeJoinable = makeJoinable(dataSource);
@@ -293,7 +294,7 @@ public class SegmentManagerBroadcastJoinIndexedTableTest extends InitializedNull
);
// add another segment with smaller interval, only partially overshadows so there will be 2 segments in timeline
- Assert.assertTrue(segmentManager.loadSegment(createSegment(data2, interval2, version2), false));
+ Assert.assertTrue(segmentManager.loadSegment(createSegment(data2, interval2, version2), false, SegmentLazyLoadFailCallback.NOOP));
expectedException.expect(ISE.class);
diff --git a/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java b/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java
index a91411b..762339f 100644
--- a/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java
+++ b/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java
@@ -29,6 +29,7 @@ import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.ReferenceCountingSegment;
import org.apache.druid.segment.Segment;
+import org.apache.druid.segment.SegmentLazyLoadFailCallback;
import org.apache.druid.segment.StorageAdapter;
import org.apache.druid.segment.loading.SegmentLoader;
import org.apache.druid.segment.loading.SegmentLoadingException;
@@ -69,7 +70,7 @@ public class SegmentManagerTest
}
@Override
- public Segment getSegment(final DataSegment segment, boolean lazy)
+ public Segment getSegment(final DataSegment segment, boolean lazy, SegmentLazyLoadFailCallback loadFailed)
{
return new SegmentForTesting(
MapUtils.getString(segment.getLoadSpec(), "version"),
@@ -222,7 +223,7 @@ public class SegmentManagerTest
final List<Future<Boolean>> futures = SEGMENTS.stream()
.map(
segment -> executor.submit(
- () -> segmentManager.loadSegment(segment, false)
+ () -> segmentManager.loadSegment(segment, false, SegmentLazyLoadFailCallback.NOOP)
)
)
.collect(Collectors.toList());
@@ -238,7 +239,7 @@ public class SegmentManagerTest
public void testDropSegment() throws SegmentLoadingException, ExecutionException, InterruptedException
{
for (DataSegment eachSegment : SEGMENTS) {
- Assert.assertTrue(segmentManager.loadSegment(eachSegment, false));
+ Assert.assertTrue(segmentManager.loadSegment(eachSegment, false, SegmentLazyLoadFailCallback.NOOP));
}
final List<Future<Void>> futures = ImmutableList.of(SEGMENTS.get(0), SEGMENTS.get(2)).stream()
@@ -264,14 +265,14 @@ public class SegmentManagerTest
@Test
public void testLoadDropSegment() throws SegmentLoadingException, ExecutionException, InterruptedException
{
- Assert.assertTrue(segmentManager.loadSegment(SEGMENTS.get(0), false));
- Assert.assertTrue(segmentManager.loadSegment(SEGMENTS.get(2), false));
+ Assert.assertTrue(segmentManager.loadSegment(SEGMENTS.get(0), false, SegmentLazyLoadFailCallback.NOOP));
+ Assert.assertTrue(segmentManager.loadSegment(SEGMENTS.get(2), false, SegmentLazyLoadFailCallback.NOOP));
final List<Future<Boolean>> loadFutures = ImmutableList.of(SEGMENTS.get(1), SEGMENTS.get(3), SEGMENTS.get(4))
.stream()
.map(
segment -> executor.submit(
- () -> segmentManager.loadSegment(segment, false)
+ () -> segmentManager.loadSegment(segment, false, SegmentLazyLoadFailCallback.NOOP)
)
)
.collect(Collectors.toList());
@@ -302,10 +303,10 @@ public class SegmentManagerTest
public void testLoadDuplicatedSegmentsSequentially() throws SegmentLoadingException
{
for (DataSegment segment : SEGMENTS) {
- Assert.assertTrue(segmentManager.loadSegment(segment, false));
+ Assert.assertTrue(segmentManager.loadSegment(segment, false, SegmentLazyLoadFailCallback.NOOP));
}
// try to load an existing segment
- Assert.assertFalse(segmentManager.loadSegment(SEGMENTS.get(0), false));
+ Assert.assertFalse(segmentManager.loadSegment(SEGMENTS.get(0), false, SegmentLazyLoadFailCallback.NOOP));
assertResult(SEGMENTS);
}
@@ -318,7 +319,7 @@ public class SegmentManagerTest
.stream()
.map(
segment -> executor.submit(
- () -> segmentManager.loadSegment(segment, false)
+ () -> segmentManager.loadSegment(segment, false, SegmentLazyLoadFailCallback.NOOP)
)
)
.collect(Collectors.toList());
@@ -339,7 +340,7 @@ public class SegmentManagerTest
@Test
public void testNonExistingSegmentsSequentially() throws SegmentLoadingException
{
- Assert.assertTrue(segmentManager.loadSegment(SEGMENTS.get(0), false));
+ Assert.assertTrue(segmentManager.loadSegment(SEGMENTS.get(0), false, SegmentLazyLoadFailCallback.NOOP));
// try to drop a non-existing segment of different data source
segmentManager.dropSegment(SEGMENTS.get(2));
@@ -352,7 +353,7 @@ public class SegmentManagerTest
public void testNonExistingSegmentsInParallel()
throws SegmentLoadingException, ExecutionException, InterruptedException
{
- segmentManager.loadSegment(SEGMENTS.get(0), false);
+ segmentManager.loadSegment(SEGMENTS.get(0), false, SegmentLazyLoadFailCallback.NOOP);
final List<Future<Void>> futures = ImmutableList.of(SEGMENTS.get(1), SEGMENTS.get(2))
.stream()
.map(
@@ -375,7 +376,7 @@ public class SegmentManagerTest
@Test
public void testRemoveEmptyTimeline() throws SegmentLoadingException
{
- segmentManager.loadSegment(SEGMENTS.get(0), false);
+ segmentManager.loadSegment(SEGMENTS.get(0), false, SegmentLazyLoadFailCallback.NOOP);
assertResult(ImmutableList.of(SEGMENTS.get(0)));
Assert.assertEquals(1, segmentManager.getDataSources().size());
segmentManager.dropSegment(SEGMENTS.get(0));
@@ -412,7 +413,7 @@ public class SegmentManagerTest
10
);
- segmentManager.loadSegment(segment, false);
+ segmentManager.loadSegment(segment, false, SegmentLazyLoadFailCallback.NOOP);
assertResult(ImmutableList.of(segment));
segmentManager.dropSegment(segment);
@@ -442,7 +443,7 @@ public class SegmentManagerTest
segment.getInterval(),
segment.getVersion(),
segment.getShardSpec().createChunk(
- ReferenceCountingSegment.wrapSegment(SEGMENT_LOADER.getSegment(segment, false), segment.getShardSpec())
+ ReferenceCountingSegment.wrapSegment(SEGMENT_LOADER.getSegment(segment, false, SegmentLazyLoadFailCallback.NOOP), segment.getShardSpec())
)
);
}
diff --git a/server/src/test/java/org/apache/druid/server/SegmentManagerThreadSafetyTest.java b/server/src/test/java/org/apache/druid/server/SegmentManagerThreadSafetyTest.java
index e5593f1..87587ce 100644
--- a/server/src/test/java/org/apache/druid/server/SegmentManagerThreadSafetyTest.java
+++ b/server/src/test/java/org/apache/druid/server/SegmentManagerThreadSafetyTest.java
@@ -34,6 +34,7 @@ import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.Segment;
+import org.apache.druid.segment.SegmentLazyLoadFailCallback;
import org.apache.druid.segment.StorageAdapter;
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.segment.loading.LocalDataSegmentPuller;
@@ -129,7 +130,7 @@ public class SegmentManagerThreadSafetyTest
final DataSegment segment = createSegment("2019-01-01/2019-01-02");
final List<Future> futures = IntStream
.range(0, 16)
- .mapToObj(i -> exec.submit(() -> segmentManager.loadSegment(segment, false)))
+ .mapToObj(i -> exec.submit(() -> segmentManager.loadSegment(segment, false, SegmentLazyLoadFailCallback.NOOP)))
.collect(Collectors.toList());
for (Future future : futures) {
future.get();
@@ -154,7 +155,7 @@ public class SegmentManagerThreadSafetyTest
.mapToObj(i -> exec.submit(() -> {
for (DataSegment segment : segments) {
try {
- segmentManager.loadSegment(segment, false);
+ segmentManager.loadSegment(segment, false, SegmentLazyLoadFailCallback.NOOP);
}
catch (SegmentLoadingException e) {
throw new RuntimeException(e);
@@ -222,7 +223,7 @@ public class SegmentManagerThreadSafetyTest
private static class TestSegmentizerFactory implements SegmentizerFactory
{
@Override
- public Segment factorize(DataSegment segment, File parentDir, boolean lazy)
+ public Segment factorize(DataSegment segment, File parentDir, boolean lazy, SegmentLazyLoadFailCallback SegmentLazyLoadFailCallback)
{
return new Segment()
{
diff --git a/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java b/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java
index a979784..8aa9f36 100644
--- a/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java
@@ -539,7 +539,7 @@ public class SegmentLoadDropHandlerTest
public void testProcessBatchDuplicateLoadRequestsWhenFirstRequestFailsSecondRequestShouldSucceed() throws Exception
{
final SegmentManager segmentManager = Mockito.mock(SegmentManager.class);
- Mockito.when(segmentManager.loadSegment(ArgumentMatchers.any(), ArgumentMatchers.anyBoolean()))
+ Mockito.when(segmentManager.loadSegment(ArgumentMatchers.any(), ArgumentMatchers.anyBoolean(), ArgumentMatchers.any()))
.thenThrow(new RuntimeException("segment loading failure test"))
.thenReturn(true);
final SegmentLoadDropHandler segmentLoadDropHandler = new SegmentLoadDropHandler(
diff --git a/server/src/test/java/org/apache/druid/server/coordination/ServerManagerTest.java b/server/src/test/java/org/apache/druid/server/coordination/ServerManagerTest.java
index 4f00cde..6f8d5a6 100644
--- a/server/src/test/java/org/apache/druid/server/coordination/ServerManagerTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordination/ServerManagerTest.java
@@ -75,6 +75,7 @@ import org.apache.druid.segment.Metadata;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.ReferenceCountingSegment;
import org.apache.druid.segment.Segment;
+import org.apache.druid.segment.SegmentLazyLoadFailCallback;
import org.apache.druid.segment.StorageAdapter;
import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.column.ColumnCapabilities;
@@ -151,7 +152,7 @@ public class ServerManagerTest
}
@Override
- public Segment getSegment(final DataSegment segment, boolean lazy)
+ public Segment getSegment(final DataSegment segment, boolean lazy, SegmentLazyLoadFailCallback SegmentLazyLoadFailCallback)
{
return new SegmentForTesting(
MapUtils.getString(segment.getLoadSpec(), "version"),
@@ -674,7 +675,8 @@ public class ServerManagerTest
IndexIO.CURRENT_VERSION_ID,
123L
),
- false
+ false,
+ SegmentLazyLoadFailCallback.NOOP
);
}
catch (SegmentLoadingException e) {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org