You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by as...@apache.org on 2019/07/21 13:23:56 UTC
[incubator-druid] branch master updated: Spotbugs:
NP_STORE_INTO_NONNULL_FIELD (#8021)
This is an automated email from the ASF dual-hosted git repository.
asdf2014 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/master by this push:
new e1a7457 Spotbugs: NP_STORE_INTO_NONNULL_FIELD (#8021)
e1a7457 is described below
commit e1a745717e0b008ca76a0a27e96d3097fc48a8f9
Author: Fokko Driesprong <fo...@apache.org>
AuthorDate: Sun Jul 21 15:23:47 2019 +0200
Spotbugs: NP_STORE_INTO_NONNULL_FIELD (#8021)
---
codestyle/spotbugs-exclude.xml | 1 -
.../quantiles/DoublesSketchBuildAggregator.java | 3 +
.../quantiles/DoublesSketchMergeAggregator.java | 3 +
.../datasketches/theta/SketchAggregator.java | 4 +
.../datasketches/theta/SketchHolder.java | 6 +-
.../tuple/ArrayOfDoublesSketchBuildAggregator.java | 4 +
.../ArrayOfDoublesSketchBuildBufferAggregator.java | 3 +
.../tuple/ArrayOfDoublesSketchMergeAggregator.java | 3 +
.../druid/query/groupby/GroupByQueryEngine.java | 7 +-
.../groupby/epinephelinae/BufferArrayGrouper.java | 4 +
.../groupby/epinephelinae/BufferHashGrouper.java | 41 +-
.../epinephelinae/GroupByQueryEngineV2.java | 1 +
.../epinephelinae/RowBasedGrouperHelper.java | 559 ++++++++-------------
.../org/apache/druid/segment/MetricHolder.java | 4 +
.../segment/SingleScanTimeDimensionSelector.java | 4 +-
.../druid/segment/StringDimensionIndexer.java | 6 +-
.../druid/segment/StringDimensionMergerV9.java | 42 +-
.../apache/druid/segment/column/ColumnBuilder.java | 9 +-
.../segment/column/ColumnCapabilitiesImpl.java | 4 +
.../druid/segment/column/ColumnDescriptor.java | 4 +
.../data/BlockLayoutColumnarDoublesSerializer.java | 3 +
.../data/BlockLayoutColumnarFloatsSerializer.java | 3 +
.../data/BlockLayoutColumnarLongsSerializer.java | 3 +
.../druid/segment/data/ByteBufferWriter.java | 4 +
.../data/CompressedColumnarIntsSerializer.java | 6 +-
.../CompressedVSizeColumnarIntsSerializer.java | 5 +-
.../apache/druid/segment/data/GenericIndexed.java | 1 +
.../druid/segment/data/GenericIndexedWriter.java | 3 +
.../data/IntermediateColumnarLongsSerializer.java | 6 +-
.../segment/data/LongsLongEncodingWriter.java | 4 +
.../segment/data/VSizeColumnarIntsSerializer.java | 6 +-
.../data/VSizeColumnarMultiIntsSerializer.java | 4 +
.../apache/druid/segment/data/VSizeLongSerde.java | 9 +-
.../segment/incremental/IncrementalIndex.java | 88 ++--
.../incremental/IncrementalIndexAdapter.java | 1 +
.../incremental/IncrementalIndexRowHolder.java | 3 +
.../incremental/OffheapIncrementalIndex.java | 5 +
.../incremental/OnheapIncrementalIndex.java | 4 +-
.../segment/serde/ComplexColumnPartSerde.java | 18 +-
.../serde/DictionaryEncodedColumnPartSerde.java | 10 +-
.../serde/DoubleNumericColumnPartSerde.java | 37 +-
.../serde/DoubleNumericColumnPartSerdeV2.java | 5 +-
.../segment/serde/FloatNumericColumnPartSerde.java | 2 +
.../serde/FloatNumericColumnPartSerdeV2.java | 5 +-
.../segment/serde/LongNumericColumnPartSerde.java | 35 +-
.../serde/LongNumericColumnPartSerdeV2.java | 11 +-
.../firehose/EventReceiverFirehoseFactory.java | 5 +-
.../realtime/firehose/PredicateFirehose.java | 1 +
.../realtime/firehose/SqlFirehoseFactory.java | 3 +
.../coordination/BatchDataSegmentAnnouncer.java | 24 +-
.../BatchDataSegmentAnnouncerProvider.java | 6 +-
.../coordination/ChangeRequestHttpSyncer.java | 5 +-
.../coordination/SegmentLoadDropHandler.java | 97 ++--
.../druid/server/coordination/ZkCoordinator.java | 57 +--
54 files changed, 588 insertions(+), 603 deletions(-)
diff --git a/codestyle/spotbugs-exclude.xml b/codestyle/spotbugs-exclude.xml
index e1fdbf7..44acdb8 100644
--- a/codestyle/spotbugs-exclude.xml
+++ b/codestyle/spotbugs-exclude.xml
@@ -70,7 +70,6 @@
<Bug pattern="NP_NULL_PARAM_DEREF"/>
<Bug pattern="NP_NULL_PARAM_DEREF_NONVIRTUAL"/>
<Bug pattern="NP_PARAMETER_MUST_BE_NONNULL_BUT_MARKED_AS_NULLABLE"/>
- <Bug pattern="NP_STORE_INTO_NONNULL_FIELD"/>
<Bug pattern="NS_DANGEROUS_NON_SHORT_CIRCUIT"/>
<Bug pattern="OBL_UNSATISFIED_OBLIGATION"/>
<Bug pattern="OS_OPEN_STREAM"/>
diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchBuildAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchBuildAggregator.java
index 18f94a9..7e0ff89 100644
--- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchBuildAggregator.java
+++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchBuildAggregator.java
@@ -24,11 +24,14 @@ import com.yahoo.sketches.quantiles.UpdateDoublesSketch;
import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.segment.ColumnValueSelector;
+import javax.annotation.Nullable;
+
public class DoublesSketchBuildAggregator implements Aggregator
{
private final ColumnValueSelector<Double> valueSelector;
+ @Nullable
private UpdateDoublesSketch sketch;
public DoublesSketchBuildAggregator(final ColumnValueSelector<Double> valueSelector, final int size)
diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchMergeAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchMergeAggregator.java
index 4598048..8c19e33 100644
--- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchMergeAggregator.java
+++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchMergeAggregator.java
@@ -24,10 +24,13 @@ import com.yahoo.sketches.quantiles.DoublesUnion;
import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.segment.ColumnValueSelector;
+import javax.annotation.Nullable;
+
public class DoublesSketchMergeAggregator implements Aggregator
{
private final ColumnValueSelector selector;
+ @Nullable
private DoublesUnion union;
public DoublesSketchMergeAggregator(final ColumnValueSelector selector, final int k)
diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregator.java
index 075d8c7..ee1ddd2 100644
--- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregator.java
+++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregator.java
@@ -26,12 +26,16 @@ import org.apache.druid.java.util.common.ISE;
import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.segment.BaseObjectColumnValueSelector;
+import javax.annotation.Nullable;
+
import java.util.List;
public class SketchAggregator implements Aggregator
{
private final BaseObjectColumnValueSelector selector;
private final int size;
+
+ @Nullable
private Union union;
public SketchAggregator(BaseObjectColumnValueSelector selector, int size)
diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchHolder.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchHolder.java
index 9ac9056..9ba8cda 100644
--- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchHolder.java
+++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchHolder.java
@@ -35,6 +35,8 @@ import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
+import javax.annotation.Nullable;
+
import java.util.Arrays;
import java.util.Comparator;
@@ -49,7 +51,7 @@ public class SketchHolder
);
public static final Comparator<Object> COMPARATOR = Ordering.from(
- new Comparator()
+ new Comparator<Object>()
{
@Override
public int compare(Object o1, Object o2)
@@ -108,7 +110,9 @@ public class SketchHolder
private final Object obj;
+ @Nullable
private volatile Double cachedEstimate = null;
+ @Nullable
private volatile Sketch cachedSketch = null;
private SketchHolder(Object obj)
diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchBuildAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchBuildAggregator.java
index 11a3061..2781bc5 100644
--- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchBuildAggregator.java
+++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchBuildAggregator.java
@@ -26,6 +26,8 @@ import org.apache.druid.segment.BaseDoubleColumnValueSelector;
import org.apache.druid.segment.DimensionSelector;
import org.apache.druid.segment.data.IndexedInts;
+import javax.annotation.Nullable;
+
import java.util.List;
/**
@@ -38,7 +40,9 @@ public class ArrayOfDoublesSketchBuildAggregator implements Aggregator
private final DimensionSelector keySelector;
private final BaseDoubleColumnValueSelector[] valueSelectors;
+ @Nullable
private double[] values; // not part of the state, but to reuse in aggregate() method
+ @Nullable
private ArrayOfDoublesUpdatableSketch sketch;
public ArrayOfDoublesSketchBuildAggregator(
diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchBuildBufferAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchBuildBufferAggregator.java
index 051d59f..1c79df8 100644
--- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchBuildBufferAggregator.java
+++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchBuildBufferAggregator.java
@@ -30,6 +30,8 @@ import org.apache.druid.segment.BaseDoubleColumnValueSelector;
import org.apache.druid.segment.DimensionSelector;
import org.apache.druid.segment.data.IndexedInts;
+import javax.annotation.Nullable;
+
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.List;
@@ -50,6 +52,7 @@ public class ArrayOfDoublesSketchBuildBufferAggregator implements BufferAggregat
private final BaseDoubleColumnValueSelector[] valueSelectors;
private final int nominalEntries;
private final int maxIntermediateSize;
+ @Nullable
private double[] values; // not part of the state, but to reuse in aggregate() method
private final Striped<ReadWriteLock> stripedLock = Striped.readWriteLock(NUM_STRIPES);
diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchMergeAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchMergeAggregator.java
index 3f99ce9..db33fb9 100644
--- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchMergeAggregator.java
+++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchMergeAggregator.java
@@ -25,6 +25,8 @@ import com.yahoo.sketches.tuple.ArrayOfDoublesUnion;
import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.segment.BaseObjectColumnValueSelector;
+import javax.annotation.Nullable;
+
/**
* This aggregator merges existing sketches.
* The input column contains ArrayOfDoublesSketch.
@@ -34,6 +36,7 @@ public class ArrayOfDoublesSketchMergeAggregator implements Aggregator
{
private final BaseObjectColumnValueSelector<ArrayOfDoublesSketch> selector;
+ @Nullable
private ArrayOfDoublesUnion union;
public ArrayOfDoublesSketchMergeAggregator(
diff --git a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryEngine.java b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryEngine.java
index 1383b20..96a0b76 100644
--- a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryEngine.java
+++ b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryEngine.java
@@ -298,14 +298,13 @@ public class GroupByQueryEngine
private final ByteBuffer metricsBuffer;
private final int maxIntermediateRows;
- private final List<DimensionSpec> dimensionSpecs;
private final List<DimensionSelector> dimensions;
private final ArrayList<String> dimNames;
- private final List<AggregatorFactory> aggregatorSpecs;
private final BufferAggregator[] aggregators;
private final String[] metricNames;
private final int[] sizesRequired;
+ @Nullable
private List<ByteBuffer> unprocessedKeys;
private Iterator<Row> delegate;
@@ -320,7 +319,7 @@ public class GroupByQueryEngine
unprocessedKeys = null;
delegate = Collections.emptyIterator();
- dimensionSpecs = query.getDimensions();
+ List<DimensionSpec> dimensionSpecs = query.getDimensions();
dimensions = Lists.newArrayListWithExpectedSize(dimensionSpecs.size());
dimNames = Lists.newArrayListWithExpectedSize(dimensionSpecs.size());
@@ -340,7 +339,7 @@ public class GroupByQueryEngine
dimNames.add(dimSpec.getOutputName());
}
- aggregatorSpecs = query.getAggregatorSpecs();
+ List<AggregatorFactory> aggregatorSpecs = query.getAggregatorSpecs();
aggregators = new BufferAggregator[aggregatorSpecs.size()];
metricNames = new String[aggregatorSpecs.size()];
sizesRequired = new int[aggregatorSpecs.size()];
diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/BufferArrayGrouper.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/BufferArrayGrouper.java
index 7fd34bf..a0fc937 100644
--- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/BufferArrayGrouper.java
+++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/BufferArrayGrouper.java
@@ -28,6 +28,8 @@ import org.apache.druid.query.aggregation.AggregatorAdapters;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.groupby.epinephelinae.column.GroupByColumnSelectorStrategy;
+import javax.annotation.Nullable;
+
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
@@ -59,7 +61,9 @@ public class BufferArrayGrouper implements VectorGrouper, IntGrouper
private ByteBuffer valBuffer;
// Scratch objects used by aggregateVector(). Only set if initVectorized() is called.
+ @Nullable
private int[] vAggregationPositions = null;
+ @Nullable
private int[] vAggregationRows = null;
static long requiredBufferCapacity(
diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/BufferHashGrouper.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/BufferHashGrouper.java
index 1799579..a695066 100644
--- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/BufferHashGrouper.java
+++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/BufferHashGrouper.java
@@ -28,10 +28,11 @@ import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.query.aggregation.AggregatorAdapters;
import org.apache.druid.query.aggregation.AggregatorFactory;
+import javax.annotation.Nullable;
+
import java.nio.ByteBuffer;
import java.util.AbstractList;
import java.util.Collections;
-import java.util.Comparator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.function.ToIntFunction;
@@ -42,7 +43,6 @@ public class BufferHashGrouper<KeyType> extends AbstractBufferHashGrouper<KeyTyp
private static final int DEFAULT_INITIAL_BUCKETS = 1024;
private static final float DEFAULT_MAX_LOAD_FACTOR = 0.7f;
- private ByteBuffer buffer;
private boolean initialized = false;
// The BufferHashGrouper normally sorts by all fields of the grouping key with lexicographic ascending order.
@@ -54,16 +54,17 @@ public class BufferHashGrouper<KeyType> extends AbstractBufferHashGrouper<KeyTyp
// to get a comparator that uses the ordering defined by the OrderByColumnSpec of a query.
private final boolean useDefaultSorting;
- // Track the offsets of used buckets using this list.
- // When a new bucket is initialized by initializeNewBucketKey(), an offset is added to this list.
- // When expanding the table, the list is reset() and filled with the new offsets of the copied buckets.
- private ByteBuffer offsetListBuffer;
+ @Nullable
private ByteBufferIntList offsetList;
// Scratch objects used by aggregateVector(). Only set if initVectorized() is called.
+ @Nullable
private ByteBuffer vKeyBuffer = null;
+ @Nullable
private int[] vKeyHashCodes = null;
+ @Nullable
private int[] vAggregationPositions = null;
+ @Nullable
private int[] vAggregationRows = null;
public BufferHashGrouper(
@@ -93,7 +94,7 @@ public class BufferHashGrouper<KeyType> extends AbstractBufferHashGrouper<KeyTyp
public void init()
{
if (!initialized) {
- this.buffer = bufferSupplier.get();
+ ByteBuffer buffer = bufferSupplier.get();
int hashTableSize = ByteBufferHashTable.calculateTableArenaSizeWithPerBucketAdditionalSize(
buffer.capacity(),
@@ -106,7 +107,10 @@ public class BufferHashGrouper<KeyType> extends AbstractBufferHashGrouper<KeyTyp
hashTableBuffer.limit(hashTableSize);
hashTableBuffer = hashTableBuffer.slice();
- offsetListBuffer = buffer.duplicate();
+ // Track the offsets of used buckets using this list.
+ // When a new bucket is initialized by initializeNewBucketKey(), an offset is added to this list.
+ // When expanding the table, the list is reset() and filled with the new offsets of the copied buckets.
+ ByteBuffer offsetListBuffer = buffer.duplicate();
offsetListBuffer.position(hashTableSize);
offsetListBuffer.limit(buffer.capacity());
offsetListBuffer = offsetListBuffer.slice();
@@ -315,19 +319,14 @@ public class BufferHashGrouper<KeyType> extends AbstractBufferHashGrouper<KeyTyp
// Sort offsets in-place.
Collections.sort(
wrappedOffsets,
- new Comparator<Integer>()
- {
- @Override
- public int compare(Integer lhs, Integer rhs)
- {
- final ByteBuffer tableBuffer = hashTable.getTableBuffer();
- return comparator.compare(
- tableBuffer,
- tableBuffer,
- lhs + HASH_SIZE,
- rhs + HASH_SIZE
- );
- }
+ (lhs, rhs) -> {
+ final ByteBuffer tableBuffer = hashTable.getTableBuffer();
+ return comparator.compare(
+ tableBuffer,
+ tableBuffer,
+ lhs + HASH_SIZE,
+ rhs + HASH_SIZE
+ );
}
);
diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java
index a39441d..836f36f 100644
--- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java
+++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java
@@ -363,6 +363,7 @@ public class GroupByQueryEngineV2
protected final GroupByColumnSelectorPlus[] dims;
protected final DateTime timestamp;
+ @Nullable
protected CloseableGrouperIterator<KeyType, Row> delegate = null;
protected final boolean allSingleValueDims;
diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java
index de5da62..2c75c35 100644
--- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java
+++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java
@@ -256,35 +256,27 @@ public class RowBasedGrouperHelper
valueTypes
);
- final Accumulator<AggregateResult, Row> accumulator = new Accumulator<AggregateResult, Row>()
- {
- @Override
- public AggregateResult accumulate(
- final AggregateResult priorResult,
- final Row row
- )
- {
- BaseQuery.checkInterrupted();
+ final Accumulator<AggregateResult, Row> accumulator = (priorResult, row) -> {
+ BaseQuery.checkInterrupted();
- if (priorResult != null && !priorResult.isOk()) {
- // Pass-through error returns without doing more work.
- return priorResult;
- }
+ if (priorResult != null && !priorResult.isOk()) {
+ // Pass-through error returns without doing more work.
+ return priorResult;
+ }
- if (!grouper.isInitialized()) {
- grouper.init();
- }
+ if (!grouper.isInitialized()) {
+ grouper.init();
+ }
- columnSelectorRow.set(row);
+ columnSelectorRow.set(row);
- final Comparable[] key = new Comparable[keySize];
- valueExtractFn.apply(row, key);
+ final Comparable[] key = new Comparable[keySize];
+ valueExtractFn.apply(row, key);
- final AggregateResult aggregateResult = grouper.aggregate(new RowBasedKey(key));
- columnSelectorRow.set(null);
+ final AggregateResult aggregateResult = grouper.aggregate(new RowBasedKey(key));
+ columnSelectorRow.set(null);
- return aggregateResult;
- }
+ return aggregateResult;
};
return new Pair<>(grouper, accumulator);
@@ -302,33 +294,12 @@ public class RowBasedGrouperHelper
{
if (isInputRaw) {
if (query.getGranularity() instanceof AllGranularity) {
- return new TimestampExtractFunction()
- {
- @Override
- public long apply(Row row)
- {
- return query.getIntervals().get(0).getStartMillis();
- }
- };
+ return row -> query.getIntervals().get(0).getStartMillis();
} else {
- return new TimestampExtractFunction()
- {
- @Override
- public long apply(Row row)
- {
- return query.getGranularity().bucketStart(row.getTimestamp()).getMillis();
- }
- };
+ return row -> query.getGranularity().bucketStart(row.getTimestamp()).getMillis();
}
} else {
- return new TimestampExtractFunction()
- {
- @Override
- public long apply(Row row)
- {
- return row.getTimestampFromEpoch();
- }
- };
+ return Row::getTimestampFromEpoch;
}
}
@@ -358,60 +329,40 @@ public class RowBasedGrouperHelper
);
if (includeTimestamp) {
- return new ValueExtractFunction()
- {
- @Override
- public Comparable[] apply(Row row, Comparable[] key)
- {
- key[0] = timestampExtractFn.apply(row);
- for (int i = 1; i < key.length; i++) {
- final Comparable val = inputRawSuppliers[i - 1].get();
- key[i] = valueConvertFns[i - 1].apply(val);
- }
- return key;
+ return (row, key) -> {
+ key[0] = timestampExtractFn.apply(row);
+ for (int i = 1; i < key.length; i++) {
+ final Comparable val = inputRawSuppliers[i - 1].get();
+ key[i] = valueConvertFns[i - 1].apply(val);
}
+ return key;
};
} else {
- return new ValueExtractFunction()
- {
- @Override
- public Comparable[] apply(Row row, Comparable[] key)
- {
- for (int i = 0; i < key.length; i++) {
- final Comparable val = inputRawSuppliers[i].get();
- key[i] = valueConvertFns[i].apply(val);
- }
- return key;
+ return (row, key) -> {
+ for (int i = 0; i < key.length; i++) {
+ final Comparable val = inputRawSuppliers[i].get();
+ key[i] = valueConvertFns[i].apply(val);
}
+ return key;
};
}
} else {
if (includeTimestamp) {
- return new ValueExtractFunction()
- {
- @Override
- public Comparable[] apply(Row row, Comparable[] key)
- {
- key[0] = timestampExtractFn.apply(row);
- for (int i = 1; i < key.length; i++) {
- final Comparable val = (Comparable) row.getRaw(query.getDimensions().get(i - 1).getOutputName());
- key[i] = valueConvertFns[i - 1].apply(val);
- }
- return key;
+ return (row, key) -> {
+ key[0] = timestampExtractFn.apply(row);
+ for (int i = 1; i < key.length; i++) {
+ final Comparable val = (Comparable) row.getRaw(query.getDimensions().get(i - 1).getOutputName());
+ key[i] = valueConvertFns[i - 1].apply(val);
}
+ return key;
};
} else {
- return new ValueExtractFunction()
- {
- @Override
- public Comparable[] apply(Row row, Comparable[] key)
- {
- for (int i = 0; i < key.length; i++) {
- final Comparable val = (Comparable) row.getRaw(query.getDimensions().get(i).getOutputName());
- key[i] = valueConvertFns[i].apply(val);
- }
- return key;
+ return (row, key) -> {
+ for (int i = 0; i < key.length; i++) {
+ final Comparable val = (Comparable) row.getRaw(query.getDimensions().get(i).getOutputName());
+ key[i] = valueConvertFns[i].apply(val);
}
+ return key;
};
}
}
@@ -437,56 +388,51 @@ public class RowBasedGrouperHelper
return new CloseableGrouperIterator<>(
grouper.iterator(true),
- new Function<Grouper.Entry<RowBasedKey>, Row>()
- {
- @Override
- public Row apply(Grouper.Entry<RowBasedKey> entry)
- {
- Map<String, Object> theMap = Maps.newLinkedHashMap();
-
- // Get timestamp, maybe.
- final DateTime timestamp;
- final int dimStart;
-
- if (includeTimestamp) {
- timestamp = query.getGranularity().toDateTime(((long) (entry.getKey().getKey()[0])));
- dimStart = 1;
- } else {
- timestamp = null;
- dimStart = 0;
- }
+ entry -> {
+ Map<String, Object> theMap = Maps.newLinkedHashMap();
- // Add dimensions.
- if (dimsToInclude == null) {
- for (int i = dimStart; i < entry.getKey().getKey().length; i++) {
- Object dimVal = entry.getKey().getKey()[i];
- theMap.put(
- query.getDimensions().get(i - dimStart).getOutputName(),
- dimVal instanceof String ? NullHandling.emptyToNullIfNeeded((String) dimVal) : dimVal
- );
- }
- } else {
- Map<String, Object> dimensions = new HashMap<>();
- for (int i = dimStart; i < entry.getKey().getKey().length; i++) {
- Object dimVal = entry.getKey().getKey()[i];
- dimensions.put(
- query.getDimensions().get(i - dimStart).getOutputName(),
- dimVal instanceof String ? NullHandling.emptyToNullIfNeeded((String) dimVal) : dimVal
- );
- }
-
- for (String dimToInclude : dimsToInclude) {
- theMap.put(dimToInclude, dimensions.get(dimToInclude));
- }
+ // Get timestamp, maybe.
+ final DateTime timestamp;
+ final int dimStart;
+
+ if (includeTimestamp) {
+ timestamp = query.getGranularity().toDateTime(((long) (entry.getKey().getKey()[0])));
+ dimStart = 1;
+ } else {
+ timestamp = null;
+ dimStart = 0;
+ }
+
+ // Add dimensions.
+ if (dimsToInclude == null) {
+ for (int i = dimStart; i < entry.getKey().getKey().length; i++) {
+ Object dimVal = entry.getKey().getKey()[i];
+ theMap.put(
+ query.getDimensions().get(i - dimStart).getOutputName(),
+ dimVal instanceof String ? NullHandling.emptyToNullIfNeeded((String) dimVal) : dimVal
+ );
+ }
+ } else {
+ Map<String, Object> dimensions = new HashMap<>();
+ for (int i = dimStart; i < entry.getKey().getKey().length; i++) {
+ Object dimVal = entry.getKey().getKey()[i];
+ dimensions.put(
+ query.getDimensions().get(i - dimStart).getOutputName(),
+ dimVal instanceof String ? NullHandling.emptyToNullIfNeeded((String) dimVal) : dimVal
+ );
}
- // Add aggregations.
- for (int i = 0; i < entry.getValues().length; i++) {
- theMap.put(query.getAggregatorSpecs().get(i).getName(), entry.getValues()[i]);
+ for (String dimToInclude : dimsToInclude) {
+ theMap.put(dimToInclude, dimensions.get(dimToInclude));
}
+ }
- return new MapBasedRow(timestamp, theMap);
+ // Add aggregations.
+ for (int i = 0; i < entry.getValues().length; i++) {
+ theMap.put(query.getAggregatorSpecs().get(i).getName(), entry.getValues()[i]);
}
+
+ return new MapBasedRow(timestamp, theMap);
},
closeable
);
@@ -713,47 +659,30 @@ public class RowBasedGrouperHelper
if (includeTimestamp) {
if (sortByDimsFirst) {
- return new Comparator<Grouper.Entry<RowBasedKey>>()
- {
- @Override
- public int compare(Grouper.Entry<RowBasedKey> entry1, Grouper.Entry<RowBasedKey> entry2)
- {
- final int cmp = compareDimsInRows(entry1.getKey(), entry2.getKey(), 1);
- if (cmp != 0) {
- return cmp;
- }
-
- return Longs.compare((long) entry1.getKey().getKey()[0], (long) entry2.getKey().getKey()[0]);
+ return (entry1, entry2) -> {
+ final int cmp = compareDimsInRows(entry1.getKey(), entry2.getKey(), 1);
+ if (cmp != 0) {
+ return cmp;
}
+
+ return Longs.compare((long) entry1.getKey().getKey()[0], (long) entry2.getKey().getKey()[0]);
};
} else {
- return new Comparator<Grouper.Entry<RowBasedKey>>()
- {
- @Override
- public int compare(Grouper.Entry<RowBasedKey> entry1, Grouper.Entry<RowBasedKey> entry2)
- {
- final int timeCompare = Longs.compare(
- (long) entry1.getKey().getKey()[0],
- (long) entry2.getKey().getKey()[0]
- );
-
- if (timeCompare != 0) {
- return timeCompare;
- }
+ return (entry1, entry2) -> {
+ final int timeCompare = Longs.compare(
+ (long) entry1.getKey().getKey()[0],
+ (long) entry2.getKey().getKey()[0]
+ );
- return compareDimsInRows(entry1.getKey(), entry2.getKey(), 1);
+ if (timeCompare != 0) {
+ return timeCompare;
}
+
+ return compareDimsInRows(entry1.getKey(), entry2.getKey(), 1);
};
}
} else {
- return new Comparator<Grouper.Entry<RowBasedKey>>()
- {
- @Override
- public int compare(Grouper.Entry<RowBasedKey> entry1, Grouper.Entry<RowBasedKey> entry2)
- {
- return compareDimsInRows(entry1.getKey(), entry2.getKey(), 0);
- }
- };
+ return (entry1, entry2) -> compareDimsInRows(entry1.getKey(), entry2.getKey(), 0);
}
}
@@ -804,74 +733,57 @@ public class RowBasedGrouperHelper
if (includeTimestamp) {
if (sortByDimsFirst) {
- return new Comparator<Grouper.Entry<RowBasedKey>>()
- {
- @Override
- public int compare(Grouper.Entry<RowBasedKey> entry1, Grouper.Entry<RowBasedKey> entry2)
- {
- final int cmp = compareDimsInRowsWithAggs(
- entry1,
- entry2,
- 1,
- needsReverses,
- aggFlags,
- fieldIndices,
- isNumericField,
- comparators
- );
- if (cmp != 0) {
- return cmp;
- }
-
- return Longs.compare((long) entry1.getKey().getKey()[0], (long) entry2.getKey().getKey()[0]);
+ return (entry1, entry2) -> {
+ final int cmp = compareDimsInRowsWithAggs(
+ entry1,
+ entry2,
+ 1,
+ needsReverses,
+ aggFlags,
+ fieldIndices,
+ isNumericField,
+ comparators
+ );
+ if (cmp != 0) {
+ return cmp;
}
+
+ return Longs.compare((long) entry1.getKey().getKey()[0], (long) entry2.getKey().getKey()[0]);
};
} else {
- return new Comparator<Grouper.Entry<RowBasedKey>>()
- {
- @Override
- public int compare(Grouper.Entry<RowBasedKey> entry1, Grouper.Entry<RowBasedKey> entry2)
- {
- final int timeCompare = Longs.compare(
- (long) entry1.getKey().getKey()[0],
- (long) entry2.getKey().getKey()[0]
- );
+ return (entry1, entry2) -> {
+ final int timeCompare = Longs.compare(
+ (long) entry1.getKey().getKey()[0],
+ (long) entry2.getKey().getKey()[0]
+ );
- if (timeCompare != 0) {
- return timeCompare;
- }
-
- return compareDimsInRowsWithAggs(
- entry1,
- entry2,
- 1,
- needsReverses,
- aggFlags,
- fieldIndices,
- isNumericField,
- comparators
- );
+ if (timeCompare != 0) {
+ return timeCompare;
}
- };
- }
- } else {
- return new Comparator<Grouper.Entry<RowBasedKey>>()
- {
- @Override
- public int compare(Grouper.Entry<RowBasedKey> entry1, Grouper.Entry<RowBasedKey> entry2)
- {
+
return compareDimsInRowsWithAggs(
entry1,
entry2,
- 0,
+ 1,
needsReverses,
aggFlags,
fieldIndices,
isNumericField,
comparators
);
- }
- };
+ };
+ }
+ } else {
+ return (entry1, entry2) -> compareDimsInRowsWithAggs(
+ entry1,
+ entry2,
+ 0,
+ needsReverses,
+ aggFlags,
+ fieldIndices,
+ isNumericField,
+ comparators
+ );
}
}
@@ -981,6 +893,7 @@ public class RowBasedGrouperHelper
// dictionary id -> rank of the sorted dictionary
// This is initialized in the constructor and bufferComparator() with static dictionary and dynamic dictionary,
// respectively.
+ @Nullable
private int[] rankOfDictionaryIds = null;
RowBasedKeySerde(
@@ -1118,68 +1031,53 @@ public class RowBasedGrouperHelper
if (includeTimestamp) {
if (sortByDimsFirst) {
- return new Grouper.BufferComparator()
- {
- @Override
- public int compare(ByteBuffer lhsBuffer, ByteBuffer rhsBuffer, int lhsPosition, int rhsPosition)
- {
- final int cmp = compareDimsInBuffersForNullFudgeTimestamp(
- serdeHelperComparators,
- lhsBuffer,
- rhsBuffer,
- lhsPosition,
- rhsPosition
- );
- if (cmp != 0) {
- return cmp;
- }
-
- return Longs.compare(lhsBuffer.getLong(lhsPosition), rhsBuffer.getLong(rhsPosition));
+ return (lhsBuffer, rhsBuffer, lhsPosition, rhsPosition) -> {
+ final int cmp = compareDimsInBuffersForNullFudgeTimestamp(
+ serdeHelperComparators,
+ lhsBuffer,
+ rhsBuffer,
+ lhsPosition,
+ rhsPosition
+ );
+ if (cmp != 0) {
+ return cmp;
}
+
+ return Longs.compare(lhsBuffer.getLong(lhsPosition), rhsBuffer.getLong(rhsPosition));
};
} else {
- return new Grouper.BufferComparator()
- {
- @Override
- public int compare(ByteBuffer lhsBuffer, ByteBuffer rhsBuffer, int lhsPosition, int rhsPosition)
- {
- final int timeCompare = Longs.compare(lhsBuffer.getLong(lhsPosition), rhsBuffer.getLong(rhsPosition));
-
- if (timeCompare != 0) {
- return timeCompare;
- }
-
- return compareDimsInBuffersForNullFudgeTimestamp(
- serdeHelperComparators,
- lhsBuffer,
- rhsBuffer,
- lhsPosition,
- rhsPosition
- );
+ return (lhsBuffer, rhsBuffer, lhsPosition, rhsPosition) -> {
+ final int timeCompare = Longs.compare(lhsBuffer.getLong(lhsPosition), rhsBuffer.getLong(rhsPosition));
+
+ if (timeCompare != 0) {
+ return timeCompare;
}
+
+ return compareDimsInBuffersForNullFudgeTimestamp(
+ serdeHelperComparators,
+ lhsBuffer,
+ rhsBuffer,
+ lhsPosition,
+ rhsPosition
+ );
};
}
} else {
- return new Grouper.BufferComparator()
- {
- @Override
- public int compare(ByteBuffer lhsBuffer, ByteBuffer rhsBuffer, int lhsPosition, int rhsPosition)
- {
- for (int i = 0; i < dimCount; i++) {
- final int cmp = serdeHelperComparators[i].compare(
- lhsBuffer,
- rhsBuffer,
- lhsPosition,
- rhsPosition
- );
+ return (lhsBuffer, rhsBuffer, lhsPosition, rhsPosition) -> {
+ for (int i = 0; i < dimCount; i++) {
+ final int cmp = serdeHelperComparators[i].compare(
+ lhsBuffer,
+ rhsBuffer,
+ lhsPosition,
+ rhsPosition
+ );
- if (cmp != 0) {
- return cmp;
- }
+ if (cmp != 0) {
+ return cmp;
}
-
- return 0;
}
+
+ return 0;
};
}
}
@@ -1246,84 +1144,69 @@ public class RowBasedGrouperHelper
if (includeTimestamp) {
if (sortByDimsFirst) {
- return new Grouper.BufferComparator()
- {
- @Override
- public int compare(ByteBuffer lhsBuffer, ByteBuffer rhsBuffer, int lhsPosition, int rhsPosition)
- {
- final int cmp = compareDimsInBuffersForNullFudgeTimestampForPushDown(
- adjustedSerdeHelperComparators,
- needsReverses,
- fieldCount,
- lhsBuffer,
- rhsBuffer,
- lhsPosition,
- rhsPosition
- );
- if (cmp != 0) {
- return cmp;
- }
-
- return Longs.compare(lhsBuffer.getLong(lhsPosition), rhsBuffer.getLong(rhsPosition));
+ return (lhsBuffer, rhsBuffer, lhsPosition, rhsPosition) -> {
+ final int cmp = compareDimsInBuffersForNullFudgeTimestampForPushDown(
+ adjustedSerdeHelperComparators,
+ needsReverses,
+ fieldCount,
+ lhsBuffer,
+ rhsBuffer,
+ lhsPosition,
+ rhsPosition
+ );
+ if (cmp != 0) {
+ return cmp;
}
+
+ return Longs.compare(lhsBuffer.getLong(lhsPosition), rhsBuffer.getLong(rhsPosition));
};
} else {
- return new Grouper.BufferComparator()
- {
- @Override
- public int compare(ByteBuffer lhsBuffer, ByteBuffer rhsBuffer, int lhsPosition, int rhsPosition)
- {
- final int timeCompare = Longs.compare(lhsBuffer.getLong(lhsPosition), rhsBuffer.getLong(rhsPosition));
-
- if (timeCompare != 0) {
- return timeCompare;
- }
-
- int cmp = compareDimsInBuffersForNullFudgeTimestampForPushDown(
- adjustedSerdeHelperComparators,
- needsReverses,
- fieldCount,
+ return (lhsBuffer, rhsBuffer, lhsPosition, rhsPosition) -> {
+ final int timeCompare = Longs.compare(lhsBuffer.getLong(lhsPosition), rhsBuffer.getLong(rhsPosition));
+
+ if (timeCompare != 0) {
+ return timeCompare;
+ }
+
+ int cmp = compareDimsInBuffersForNullFudgeTimestampForPushDown(
+ adjustedSerdeHelperComparators,
+ needsReverses,
+ fieldCount,
+ lhsBuffer,
+ rhsBuffer,
+ lhsPosition,
+ rhsPosition
+ );
+
+ return cmp;
+ };
+ }
+ } else {
+ return (lhsBuffer, rhsBuffer, lhsPosition, rhsPosition) -> {
+ for (int i = 0; i < fieldCount; i++) {
+ final int cmp;
+ if (needsReverses.get(i)) {
+ cmp = adjustedSerdeHelperComparators[i].compare(
+ rhsBuffer,
+ lhsBuffer,
+ rhsPosition,
+ lhsPosition
+ );
+ } else {
+ cmp = adjustedSerdeHelperComparators[i].compare(
lhsBuffer,
rhsBuffer,
lhsPosition,
rhsPosition
);
+ }
+ if (cmp != 0) {
return cmp;
}
- };
- }
- } else {
- return new Grouper.BufferComparator()
- {
- @Override
- public int compare(ByteBuffer lhsBuffer, ByteBuffer rhsBuffer, int lhsPosition, int rhsPosition)
- {
- for (int i = 0; i < fieldCount; i++) {
- final int cmp;
- if (needsReverses.get(i)) {
- cmp = adjustedSerdeHelperComparators[i].compare(
- rhsBuffer,
- lhsBuffer,
- rhsPosition,
- lhsPosition
- );
- } else {
- cmp = adjustedSerdeHelperComparators[i].compare(
- lhsBuffer,
- rhsBuffer,
- lhsPosition,
- rhsPosition
- );
- }
-
- if (cmp != 0) {
- return cmp;
- }
- }
-
- return 0;
}
+
+ return 0;
};
}
}
diff --git a/processing/src/main/java/org/apache/druid/segment/MetricHolder.java b/processing/src/main/java/org/apache/druid/segment/MetricHolder.java
index 7705735..8f4ee20 100644
--- a/processing/src/main/java/org/apache/druid/segment/MetricHolder.java
+++ b/processing/src/main/java/org/apache/druid/segment/MetricHolder.java
@@ -26,6 +26,8 @@ import org.apache.druid.segment.data.GenericIndexed;
import org.apache.druid.segment.serde.ComplexMetricSerde;
import org.apache.druid.segment.serde.ComplexMetrics;
+import javax.annotation.Nullable;
+
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
@@ -96,7 +98,9 @@ public class MetricHolder
private final String name;
private final String typeName;
private final MetricType type;
+ @Nullable
CompressedColumnarFloatsSupplier floatType = null;
+ @Nullable
GenericIndexed<?> complexType = null;
private MetricHolder(
diff --git a/processing/src/main/java/org/apache/druid/segment/SingleScanTimeDimensionSelector.java b/processing/src/main/java/org/apache/druid/segment/SingleScanTimeDimensionSelector.java
index 8a1be5a..1cf3efb 100644
--- a/processing/src/main/java/org/apache/druid/segment/SingleScanTimeDimensionSelector.java
+++ b/processing/src/main/java/org/apache/druid/segment/SingleScanTimeDimensionSelector.java
@@ -46,10 +46,12 @@ public class SingleScanTimeDimensionSelector implements DimensionSelector
private final List<String> timeValues = new ArrayList<>();
private final SingleIndexedInt row = new SingleIndexedInt();
- private String currentValue = null;
private long currentTimestamp = Long.MIN_VALUE;
private int index = -1;
+ @Nullable
+ private String currentValue = null;
+
public SingleScanTimeDimensionSelector(
BaseLongColumnValueSelector selector,
@Nullable ExtractionFn extractionFn,
diff --git a/processing/src/main/java/org/apache/druid/segment/StringDimensionIndexer.java b/processing/src/main/java/org/apache/druid/segment/StringDimensionIndexer.java
index cf201e4..332da92 100644
--- a/processing/src/main/java/org/apache/druid/segment/StringDimensionIndexer.java
+++ b/processing/src/main/java/org/apache/druid/segment/StringDimensionIndexer.java
@@ -232,9 +232,11 @@ public class StringDimensionIndexer implements DimensionIndexer<Integer, int[],
private final DimensionDictionary dimLookup;
private final MultiValueHandling multiValueHandling;
private final boolean hasBitmapIndexes;
- private SortedDimensionDictionary sortedLookup;
private boolean hasMultipleValues = false;
+ @Nullable
+ private SortedDimensionDictionary sortedLookup;
+
public StringDimensionIndexer(MultiValueHandling multiValueHandling, boolean hasBitmapIndexes)
{
this.dimLookup = new DimensionDictionary();
@@ -252,7 +254,7 @@ public class StringDimensionIndexer implements DimensionIndexer<Integer, int[],
final int nullId = dimLookup.getId(null);
encodedDimensionValues = nullId == ABSENT_VALUE_ID ? new int[]{dimLookup.add(null)} : new int[]{nullId};
} else if (dimValues instanceof List) {
- List<Object> dimValuesList = (List) dimValues;
+ List<Object> dimValuesList = (List<Object>) dimValues;
if (dimValuesList.isEmpty()) {
dimLookup.add(null);
encodedDimensionValues = IntArrays.EMPTY_ARRAY;
diff --git a/processing/src/main/java/org/apache/druid/segment/StringDimensionMergerV9.java b/processing/src/main/java/org/apache/druid/segment/StringDimensionMergerV9.java
index 050163b..6340502 100644
--- a/processing/src/main/java/org/apache/druid/segment/StringDimensionMergerV9.java
+++ b/processing/src/main/java/org/apache/druid/segment/StringDimensionMergerV9.java
@@ -77,27 +77,36 @@ public class StringDimensionMergerV9 implements DimensionMergerV9
private static final Indexed<String> NULL_STR_DIM_VAL = new ListIndexed<>(Collections.singletonList(null));
private static final Splitter SPLITTER = Splitter.on(",");
- private ColumnarIntsSerializer encodedValueSerializer;
+ private final String dimensionName;
+ private final ProgressIndicator progress;
+ private final Closer closer;
+ private final IndexSpec indexSpec;
+ private final SegmentWriteOutMedium segmentWriteOutMedium;
+ private final MutableBitmap nullRowsBitmap;
+ private final ColumnCapabilities capabilities;
- private String dimensionName;
- private GenericIndexedWriter<String> dictionaryWriter;
- private String firstDictionaryValue;
private int dictionarySize;
+ private int rowCount = 0;
+ private int cardinality = 0;
+ private boolean hasNull = false;
+
+ @Nullable
private GenericIndexedWriter<ImmutableBitmap> bitmapWriter;
+ @Nullable
private ByteBufferWriter<ImmutableRTree> spatialWriter;
+ @Nullable
private ArrayList<IntBuffer> dimConversions;
- private int cardinality = 0;
- private boolean hasNull = false;
- private MutableBitmap nullRowsBitmap;
- private final SegmentWriteOutMedium segmentWriteOutMedium;
- private int rowCount = 0;
- private ColumnCapabilities capabilities;
+ @Nullable
private List<IndexableAdapter> adapters;
- private final IndexSpec indexSpec;
+ @Nullable
private IndexMerger.DictionaryMergeIterator dictionaryMergeIterator;
+ @Nullable
+ private ColumnarIntsSerializer encodedValueSerializer;
+ @Nullable
+ private GenericIndexedWriter<String> dictionaryWriter;
+ @Nullable
+ private String firstDictionaryValue;
- private final ProgressIndicator progress;
- private final Closer closer;
public StringDimensionMergerV9(
String dimensionName,
@@ -537,13 +546,12 @@ public class StringDimensionMergerV9 implements DimensionMergerV9
.withBitmapIndex(bitmapWriter)
.withSpatialIndex(spatialWriter)
.withByteOrder(IndexIO.BYTE_ORDER);
- final ColumnDescriptor serdeficator = builder
- .addSerde(partBuilder.build())
- .build();
//log.info("Completed dimension column[%s] in %,d millis.", dimensionName, System.currentTimeMillis() - dimStartTime);
- return serdeficator;
+ return builder
+ .addSerde(partBuilder.build())
+ .build();
}
protected interface IndexSeeker
diff --git a/processing/src/main/java/org/apache/druid/segment/column/ColumnBuilder.java b/processing/src/main/java/org/apache/druid/segment/column/ColumnBuilder.java
index ce081df..86b9b86 100644
--- a/processing/src/main/java/org/apache/druid/segment/column/ColumnBuilder.java
+++ b/processing/src/main/java/org/apache/druid/segment/column/ColumnBuilder.java
@@ -23,18 +23,25 @@ import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper;
+import javax.annotation.Nullable;
+
/**
*/
public class ColumnBuilder
{
+ @Nullable
private ValueType type = null;
private boolean hasMultipleValues = false;
private boolean filterable = false;
+ private boolean dictionaryEncoded = false;
+ @Nullable
private Supplier<? extends BaseColumn> columnSupplier = null;
- private boolean dictionaryEncoded = false;
+ @Nullable
private Supplier<BitmapIndex> bitmapIndex = null;
+ @Nullable
private Supplier<SpatialIndex> spatialIndex = null;
+ @Nullable
private SmooshedFileMapper fileMapper = null;
public ColumnBuilder setFileMapper(SmooshedFileMapper fileMapper)
diff --git a/processing/src/main/java/org/apache/druid/segment/column/ColumnCapabilitiesImpl.java b/processing/src/main/java/org/apache/druid/segment/column/ColumnCapabilitiesImpl.java
index 620a0f6..94cdbe3 100644
--- a/processing/src/main/java/org/apache/druid/segment/column/ColumnCapabilitiesImpl.java
+++ b/processing/src/main/java/org/apache/druid/segment/column/ColumnCapabilitiesImpl.java
@@ -23,12 +23,16 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.java.util.common.ISE;
+import javax.annotation.Nullable;
+
/**
*
*/
public class ColumnCapabilitiesImpl implements ColumnCapabilities
{
+ @Nullable
private ValueType type = null;
+
private boolean dictionaryEncoded = false;
private boolean runLengthEncoded = false;
private boolean hasInvertedIndexes = false;
diff --git a/processing/src/main/java/org/apache/druid/segment/column/ColumnDescriptor.java b/processing/src/main/java/org/apache/druid/segment/column/ColumnDescriptor.java
index 1307640..fbb5b1d 100644
--- a/processing/src/main/java/org/apache/druid/segment/column/ColumnDescriptor.java
+++ b/processing/src/main/java/org/apache/druid/segment/column/ColumnDescriptor.java
@@ -28,6 +28,8 @@ import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper;
import org.apache.druid.segment.serde.ColumnPartSerde;
import org.apache.druid.segment.serde.Serializer;
+import javax.annotation.Nullable;
+
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.WritableByteChannel;
@@ -111,7 +113,9 @@ public class ColumnDescriptor implements Serializer
public static class Builder
{
+ @Nullable
private ValueType valueType = null;
+ @Nullable
private Boolean hasMultipleValues = null;
private final List<ColumnPartSerde> parts = new ArrayList<>();
diff --git a/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarDoublesSerializer.java b/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarDoublesSerializer.java
index 8c2db09..3558473 100644
--- a/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarDoublesSerializer.java
+++ b/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarDoublesSerializer.java
@@ -25,6 +25,8 @@ import org.apache.druid.segment.CompressedPools;
import org.apache.druid.segment.serde.MetaSerdeHelper;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
+import javax.annotation.Nullable;
+
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
@@ -45,6 +47,7 @@ public class BlockLayoutColumnarDoublesSerializer implements ColumnarDoublesSeri
private final CompressionStrategy compression;
private int numInserted = 0;
+ @Nullable
private ByteBuffer endBuffer;
BlockLayoutColumnarDoublesSerializer(
diff --git a/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarFloatsSerializer.java b/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarFloatsSerializer.java
index 84d2e26..aa225b8 100644
--- a/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarFloatsSerializer.java
+++ b/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarFloatsSerializer.java
@@ -25,6 +25,8 @@ import org.apache.druid.segment.CompressedPools;
import org.apache.druid.segment.serde.MetaSerdeHelper;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
+import javax.annotation.Nullable;
+
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
@@ -45,6 +47,7 @@ public class BlockLayoutColumnarFloatsSerializer implements ColumnarFloatsSerial
private final CompressionStrategy compression;
private int numInserted = 0;
+ @Nullable
private ByteBuffer endBuffer;
BlockLayoutColumnarFloatsSerializer(
diff --git a/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarLongsSerializer.java b/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarLongsSerializer.java
index c1851c7..778a061 100644
--- a/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarLongsSerializer.java
+++ b/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarLongsSerializer.java
@@ -24,6 +24,8 @@ import org.apache.druid.segment.CompressedPools;
import org.apache.druid.segment.serde.MetaSerdeHelper;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
+import javax.annotation.Nullable;
+
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
@@ -47,6 +49,7 @@ public class BlockLayoutColumnarLongsSerializer implements ColumnarLongsSerializ
private int numInserted = 0;
private int numInsertedForNextFlush;
+ @Nullable
private ByteBuffer endBuffer;
BlockLayoutColumnarLongsSerializer(
diff --git a/processing/src/main/java/org/apache/druid/segment/data/ByteBufferWriter.java b/processing/src/main/java/org/apache/druid/segment/data/ByteBufferWriter.java
index 7152944..ee1e781 100644
--- a/processing/src/main/java/org/apache/druid/segment/data/ByteBufferWriter.java
+++ b/processing/src/main/java/org/apache/druid/segment/data/ByteBufferWriter.java
@@ -26,6 +26,8 @@ import org.apache.druid.segment.serde.Serializer;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
import org.apache.druid.segment.writeout.WriteOutBytes;
+import javax.annotation.Nullable;
+
import java.io.IOException;
import java.nio.channels.WritableByteChannel;
@@ -36,7 +38,9 @@ public class ByteBufferWriter<T> implements Serializer
private final SegmentWriteOutMedium segmentWriteOutMedium;
private final ObjectStrategy<T> strategy;
+ @Nullable
private WriteOutBytes headerOut = null;
+ @Nullable
private WriteOutBytes valueOut = null;
public ByteBufferWriter(SegmentWriteOutMedium segmentWriteOutMedium, ObjectStrategy<T> strategy)
diff --git a/processing/src/main/java/org/apache/druid/segment/data/CompressedColumnarIntsSerializer.java b/processing/src/main/java/org/apache/druid/segment/data/CompressedColumnarIntsSerializer.java
index dacf62f..a5db9e2 100644
--- a/processing/src/main/java/org/apache/druid/segment/data/CompressedColumnarIntsSerializer.java
+++ b/processing/src/main/java/org/apache/druid/segment/data/CompressedColumnarIntsSerializer.java
@@ -24,6 +24,8 @@ import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
import org.apache.druid.segment.serde.MetaSerdeHelper;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
+import javax.annotation.Nullable;
+
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
@@ -45,9 +47,11 @@ public class CompressedColumnarIntsSerializer extends SingleValueColumnarIntsSer
private final int chunkFactor;
private final CompressionStrategy compression;
private final GenericIndexedWriter<ByteBuffer> flattener;
- private ByteBuffer endBuffer;
private int numInserted;
+ @Nullable
+ private ByteBuffer endBuffer;
+
CompressedColumnarIntsSerializer(
final SegmentWriteOutMedium segmentWriteOutMedium,
final String filenameBase,
diff --git a/processing/src/main/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSerializer.java b/processing/src/main/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSerializer.java
index ffc1ee8..72f9fc9 100644
--- a/processing/src/main/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSerializer.java
+++ b/processing/src/main/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSerializer.java
@@ -25,6 +25,8 @@ import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.serde.MetaSerdeHelper;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
+import javax.annotation.Nullable;
+
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
@@ -67,9 +69,10 @@ public class CompressedVSizeColumnarIntsSerializer extends SingleValueColumnarIn
private final CompressionStrategy compression;
private final GenericIndexedWriter<ByteBuffer> flattener;
private final ByteBuffer intBuffer;
+ private int numInserted;
+ @Nullable
private ByteBuffer endBuffer;
- private int numInserted;
CompressedVSizeColumnarIntsSerializer(
final SegmentWriteOutMedium segmentWriteOutMedium,
diff --git a/processing/src/main/java/org/apache/druid/segment/data/GenericIndexed.java b/processing/src/main/java/org/apache/druid/segment/data/GenericIndexed.java
index e44a200..8a2bc57 100644
--- a/processing/src/main/java/org/apache/druid/segment/data/GenericIndexed.java
+++ b/processing/src/main/java/org/apache/druid/segment/data/GenericIndexed.java
@@ -198,6 +198,7 @@ public class GenericIndexed<T> implements CloseableIndexed<T>, Serializer
private int logBaseTwoOfElementsPerValueFile;
private int relativeIndexMask;
+ @Nullable
private final ByteBuffer theBuffer;
/**
diff --git a/processing/src/main/java/org/apache/druid/segment/data/GenericIndexedWriter.java b/processing/src/main/java/org/apache/druid/segment/data/GenericIndexedWriter.java
index 77bcd91..120b36b 100644
--- a/processing/src/main/java/org/apache/druid/segment/data/GenericIndexedWriter.java
+++ b/processing/src/main/java/org/apache/druid/segment/data/GenericIndexedWriter.java
@@ -140,10 +140,13 @@ public class GenericIndexedWriter<T> implements Serializer
private boolean objectsSorted = true;
@Nullable
private T prevObject = null;
+ @Nullable
private WriteOutBytes headerOut = null;
+ @Nullable
private WriteOutBytes valuesOut = null;
private int numWritten = 0;
private boolean requireMultipleFiles = false;
+ @Nullable
private LongList headerOutLong;
private final ByteBuffer getOffsetBuffer = ByteBuffer.allocate(Integer.BYTES);
diff --git a/processing/src/main/java/org/apache/druid/segment/data/IntermediateColumnarLongsSerializer.java b/processing/src/main/java/org/apache/druid/segment/data/IntermediateColumnarLongsSerializer.java
index e57e81d..e08e463 100644
--- a/processing/src/main/java/org/apache/druid/segment/data/IntermediateColumnarLongsSerializer.java
+++ b/processing/src/main/java/org/apache/druid/segment/data/IntermediateColumnarLongsSerializer.java
@@ -27,6 +27,8 @@ import it.unimi.dsi.fastutil.longs.LongList;
import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
+import javax.annotation.Nullable;
+
import java.io.IOException;
import java.nio.ByteOrder;
import java.nio.channels.WritableByteChannel;
@@ -42,7 +44,6 @@ public class IntermediateColumnarLongsSerializer implements ColumnarLongsSeriali
private final String filenameBase;
private final ByteOrder order;
private final CompressionStrategy compression;
- private LongList tempOut = null;
private int numInserted = 0;
@@ -52,6 +53,9 @@ public class IntermediateColumnarLongsSerializer implements ColumnarLongsSeriali
private long maxVal = Long.MIN_VALUE;
private long minVal = Long.MAX_VALUE;
+ @Nullable
+ private LongList tempOut = null;
+ @Nullable
private ColumnarLongsSerializer delegate;
IntermediateColumnarLongsSerializer(
diff --git a/processing/src/main/java/org/apache/druid/segment/data/LongsLongEncodingWriter.java b/processing/src/main/java/org/apache/druid/segment/data/LongsLongEncodingWriter.java
index b421105..2aeb194 100644
--- a/processing/src/main/java/org/apache/druid/segment/data/LongsLongEncodingWriter.java
+++ b/processing/src/main/java/org/apache/druid/segment/data/LongsLongEncodingWriter.java
@@ -21,6 +21,8 @@ package org.apache.druid.segment.data;
import org.apache.druid.segment.writeout.WriteOutBytes;
+import javax.annotation.Nullable;
+
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
@@ -31,7 +33,9 @@ public class LongsLongEncodingWriter implements CompressionFactory.LongEncodingW
private final ByteBuffer orderBuffer;
private final ByteOrder order;
+ @Nullable
private ByteBuffer outBuffer = null;
+ @Nullable
private OutputStream outStream = null;
public LongsLongEncodingWriter(ByteOrder order)
diff --git a/processing/src/main/java/org/apache/druid/segment/data/VSizeColumnarIntsSerializer.java b/processing/src/main/java/org/apache/druid/segment/data/VSizeColumnarIntsSerializer.java
index 90d4c20..ff646fc 100644
--- a/processing/src/main/java/org/apache/druid/segment/data/VSizeColumnarIntsSerializer.java
+++ b/processing/src/main/java/org/apache/druid/segment/data/VSizeColumnarIntsSerializer.java
@@ -26,6 +26,8 @@ import org.apache.druid.segment.serde.MetaSerdeHelper;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
import org.apache.druid.segment.writeout.WriteOutBytes;
+import javax.annotation.Nullable;
+
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.WritableByteChannel;
@@ -46,9 +48,11 @@ public class VSizeColumnarIntsSerializer extends SingleValueColumnarIntsSerializ
private final int numBytes;
private final ByteBuffer helperBuffer = ByteBuffer.allocate(Integer.BYTES);
- private WriteOutBytes valuesOut = null;
private boolean bufPaddingWritten = false;
+ @Nullable
+ private WriteOutBytes valuesOut = null;
+
public VSizeColumnarIntsSerializer(final SegmentWriteOutMedium segmentWriteOutMedium, final int maxValue)
{
this.segmentWriteOutMedium = segmentWriteOutMedium;
diff --git a/processing/src/main/java/org/apache/druid/segment/data/VSizeColumnarMultiIntsSerializer.java b/processing/src/main/java/org/apache/druid/segment/data/VSizeColumnarMultiIntsSerializer.java
index d183d357..6a85a46 100644
--- a/processing/src/main/java/org/apache/druid/segment/data/VSizeColumnarMultiIntsSerializer.java
+++ b/processing/src/main/java/org/apache/druid/segment/data/VSizeColumnarMultiIntsSerializer.java
@@ -26,6 +26,8 @@ import org.apache.druid.segment.serde.MetaSerdeHelper;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
import org.apache.druid.segment.writeout.WriteOutBytes;
+import javax.annotation.Nullable;
+
import java.io.IOException;
import java.nio.channels.WritableByteChannel;
@@ -83,7 +85,9 @@ public class VSizeColumnarMultiIntsSerializer extends ColumnarMultiIntsSerialize
private final WriteInt writeInt;
private final SegmentWriteOutMedium segmentWriteOutMedium;
+ @Nullable
private WriteOutBytes headerOut = null;
+ @Nullable
private WriteOutBytes valuesOut = null;
private int numWritten = 0;
private boolean numBytesForMaxWritten = false;
diff --git a/processing/src/main/java/org/apache/druid/segment/data/VSizeLongSerde.java b/processing/src/main/java/org/apache/druid/segment/data/VSizeLongSerde.java
index 3bfa68a..6741be5 100644
--- a/processing/src/main/java/org/apache/druid/segment/data/VSizeLongSerde.java
+++ b/processing/src/main/java/org/apache/druid/segment/data/VSizeLongSerde.java
@@ -21,6 +21,8 @@ package org.apache.druid.segment.data;
import org.apache.druid.java.util.common.IAE;
+import javax.annotation.Nullable;
+
import java.io.Closeable;
import java.io.IOException;
import java.io.OutputStream;
@@ -189,6 +191,7 @@ public class VSizeLongSerde
private static final class Size1Ser implements LongSerializer
{
+ @Nullable
OutputStream output = null;
ByteBuffer buffer;
byte curByte = 0;
@@ -242,6 +245,7 @@ public class VSizeLongSerde
private static final class Size2Ser implements LongSerializer
{
+ @Nullable
OutputStream output = null;
ByteBuffer buffer;
byte curByte = 0;
@@ -295,8 +299,8 @@ public class VSizeLongSerde
private static final class Mult4Ser implements LongSerializer
{
-
- OutputStream output = null;
+ @Nullable
+ OutputStream output;
ByteBuffer buffer;
int numBytes;
byte curByte = 0;
@@ -361,6 +365,7 @@ public class VSizeLongSerde
private static final class Mult8Ser implements LongSerializer
{
+ @Nullable
OutputStream output;
ByteBuffer buffer;
int numBytes;
diff --git a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java
index 515c475..8e21296 100644
--- a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java
+++ b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java
@@ -330,6 +330,7 @@ public abstract class IncrementalIndex<AggregatorType> extends AbstractIndex imp
public static class Builder
{
+ @Nullable
private IncrementalIndexSchema incrementalIndexSchema;
private boolean deserializeComplexMetrics;
private boolean reportParseExceptions;
@@ -505,8 +506,8 @@ public abstract class IncrementalIndex<AggregatorType> extends AbstractIndex imp
static class IncrementalIndexRowResult
{
- private IncrementalIndexRow incrementalIndexRow;
- private List<String> parseExceptionMessages;
+ private final IncrementalIndexRow incrementalIndexRow;
+ private final List<String> parseExceptionMessages;
IncrementalIndexRowResult(IncrementalIndexRow incrementalIndexRow, List<String> parseExceptionMessages)
{
@@ -527,9 +528,9 @@ public abstract class IncrementalIndex<AggregatorType> extends AbstractIndex imp
static class AddToFactsResult
{
- private int rowCount;
+ private final int rowCount;
private final long bytesInMemory;
- private List<String> parseExceptionMessages;
+ private final List<String> parseExceptionMessages;
public AddToFactsResult(
int rowCount,
@@ -997,53 +998,48 @@ public abstract class IncrementalIndex<AggregatorType> extends AbstractIndex imp
public Iterable<Row> iterableWithPostAggregations(final List<PostAggregator> postAggs, final boolean descending)
{
- return new Iterable<Row>()
- {
- @Override
- public Iterator<Row> iterator()
- {
- final List<DimensionDesc> dimensions = getDimensions();
-
- return Iterators.transform(
- getFacts().iterator(descending),
- incrementalIndexRow -> {
- final int rowOffset = incrementalIndexRow.getRowIndex();
-
- Object[] theDims = incrementalIndexRow.getDims();
-
- Map<String, Object> theVals = Maps.newLinkedHashMap();
- for (int i = 0; i < theDims.length; ++i) {
- Object dim = theDims[i];
- DimensionDesc dimensionDesc = dimensions.get(i);
- if (dimensionDesc == null) {
- continue;
- }
- String dimensionName = dimensionDesc.getName();
- DimensionHandler handler = dimensionDesc.getHandler();
- if (dim == null || handler.getLengthOfEncodedKeyComponent(dim) == 0) {
- theVals.put(dimensionName, null);
- continue;
- }
- final DimensionIndexer indexer = dimensionDesc.getIndexer();
- Object rowVals = indexer.convertUnsortedEncodedKeyComponentToActualList(dim);
- theVals.put(dimensionName, rowVals);
+ return () -> {
+ final List<DimensionDesc> dimensions = getDimensions();
+
+ return Iterators.transform(
+ getFacts().iterator(descending),
+ incrementalIndexRow -> {
+ final int rowOffset = incrementalIndexRow.getRowIndex();
+
+ Object[] theDims = incrementalIndexRow.getDims();
+
+ Map<String, Object> theVals = Maps.newLinkedHashMap();
+ for (int i = 0; i < theDims.length; ++i) {
+ Object dim = theDims[i];
+ DimensionDesc dimensionDesc = dimensions.get(i);
+ if (dimensionDesc == null) {
+ continue;
}
-
- AggregatorType[] aggs = getAggsForRow(rowOffset);
- for (int i = 0; i < aggs.length; ++i) {
- theVals.put(metrics[i].getName(), getAggVal(aggs[i], rowOffset, i));
+ String dimensionName = dimensionDesc.getName();
+ DimensionHandler handler = dimensionDesc.getHandler();
+ if (dim == null || handler.getLengthOfEncodedKeyComponent(dim) == 0) {
+ theVals.put(dimensionName, null);
+ continue;
}
+ final DimensionIndexer indexer = dimensionDesc.getIndexer();
+ Object rowVals = indexer.convertUnsortedEncodedKeyComponentToActualList(dim);
+ theVals.put(dimensionName, rowVals);
+ }
- if (postAggs != null) {
- for (PostAggregator postAgg : postAggs) {
- theVals.put(postAgg.getName(), postAgg.compute(theVals));
- }
- }
+ AggregatorType[] aggs = getAggsForRow(rowOffset);
+ for (int i = 0; i < aggs.length; ++i) {
+ theVals.put(metrics[i].getName(), getAggVal(aggs[i], rowOffset, i));
+ }
- return new MapBasedRow(incrementalIndexRow.getTimestamp(), theVals);
+ if (postAggs != null) {
+ for (PostAggregator postAgg : postAggs) {
+ theVals.put(postAgg.getName(), postAgg.compute(theVals));
+ }
}
- );
- }
+
+ return new MapBasedRow(incrementalIndexRow.getTimestamp(), theVals);
+ }
+ );
};
}
diff --git a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexAdapter.java b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexAdapter.java
index 2493bdd..b896c9e 100644
--- a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexAdapter.java
+++ b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexAdapter.java
@@ -48,6 +48,7 @@ public class IncrementalIndexAdapter implements IndexableAdapter
private static class DimensionAccessor
{
private final IncrementalIndex.DimensionDesc dimensionDesc;
+ @Nullable
private final MutableBitmap[] invertedIndexes;
private final DimensionIndexer indexer;
diff --git a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexRowHolder.java b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexRowHolder.java
index b9180dd..c46ef55 100644
--- a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexRowHolder.java
+++ b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexRowHolder.java
@@ -22,6 +22,8 @@ package org.apache.druid.segment.incremental;
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import org.apache.druid.segment.LongColumnSelector;
+import javax.annotation.Nullable;
+
/**
* IncrementalIndexRowHolder is a simple {@link #get}/{@link #set} holder of {@link IncrementalIndexRow}. It is used
* to implement various machinery around {@link IncrementalIndex}, e. g. {@link
@@ -33,6 +35,7 @@ import org.apache.druid.segment.LongColumnSelector;
*/
public class IncrementalIndexRowHolder implements LongColumnSelector
{
+ @Nullable
private IncrementalIndexRow currEntry = null;
public IncrementalIndexRow get()
diff --git a/processing/src/main/java/org/apache/druid/segment/incremental/OffheapIncrementalIndex.java b/processing/src/main/java/org/apache/druid/segment/incremental/OffheapIncrementalIndex.java
index 95c88fc..ac48901 100644
--- a/processing/src/main/java/org/apache/druid/segment/incremental/OffheapIncrementalIndex.java
+++ b/processing/src/main/java/org/apache/druid/segment/incremental/OffheapIncrementalIndex.java
@@ -33,6 +33,8 @@ import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.BufferAggregator;
import org.apache.druid.segment.ColumnSelectorFactory;
+import javax.annotation.Nullable;
+
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -59,14 +61,17 @@ public class OffheapIncrementalIndex extends IncrementalIndex<BufferAggregator>
protected final int maxRowCount;
+ @Nullable
private volatile Map<String, ColumnSelectorFactory> selectors;
//given a ByteBuffer and an offset where all aggregates for a row are stored
//offset + aggOffsetInBuffer[i] would give position in ByteBuffer where ith aggregate
//is stored
+ @Nullable
private volatile int[] aggOffsetInBuffer;
private volatile int aggsTotalSize;
+ @Nullable
private String outOfRowsReason = null;
OffheapIncrementalIndex(
diff --git a/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java b/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java
index 80e21a0..8e5c55b 100644
--- a/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java
+++ b/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java
@@ -60,8 +60,10 @@ public class OnheapIncrementalIndex extends IncrementalIndex<Aggregator>
private final long maxBytesPerRowForAggregators;
protected final int maxRowCount;
protected final long maxBytesInMemory;
- private volatile Map<String, ColumnSelectorFactory> selectors;
+ @Nullable
+ private volatile Map<String, ColumnSelectorFactory> selectors;
+ @Nullable
private String outOfRowsReason = null;
OnheapIncrementalIndex(
diff --git a/processing/src/main/java/org/apache/druid/segment/serde/ComplexColumnPartSerde.java b/processing/src/main/java/org/apache/druid/segment/serde/ComplexColumnPartSerde.java
index c362351..f10f53b 100644
--- a/processing/src/main/java/org/apache/druid/segment/serde/ComplexColumnPartSerde.java
+++ b/processing/src/main/java/org/apache/druid/segment/serde/ComplexColumnPartSerde.java
@@ -22,16 +22,15 @@ package org.apache.druid.segment.serde;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.segment.GenericColumnSerializer;
-import org.apache.druid.segment.column.ColumnBuilder;
-import org.apache.druid.segment.column.ColumnConfig;
-import java.nio.ByteBuffer;
+import javax.annotation.Nullable;
/**
*/
public class ComplexColumnPartSerde implements ColumnPartSerde
{
private final String typeName;
+ @Nullable
private final ComplexMetricSerde serde;
private final Serializer serializer;
@@ -70,21 +69,18 @@ public class ComplexColumnPartSerde implements ColumnPartSerde
@Override
public Deserializer getDeserializer()
{
- return new Deserializer()
- {
- @Override
- public void read(ByteBuffer buffer, ColumnBuilder builder, ColumnConfig columnConfig)
- {
- if (serde != null) {
- serde.deserializeColumn(buffer, builder);
- }
+ return (buffer, builder, columnConfig) -> {
+ if (serde != null) {
+ serde.deserializeColumn(buffer, builder);
}
};
}
public static class SerializerBuilder
{
+ @Nullable
private String typeName = null;
+ @Nullable
private GenericColumnSerializer delegate = null;
public SerializerBuilder withTypeName(final String typeName)
diff --git a/processing/src/main/java/org/apache/druid/segment/serde/DictionaryEncodedColumnPartSerde.java b/processing/src/main/java/org/apache/druid/segment/serde/DictionaryEncodedColumnPartSerde.java
index 136eca6..7f0bc3d 100644
--- a/processing/src/main/java/org/apache/druid/segment/serde/DictionaryEncodedColumnPartSerde.java
+++ b/processing/src/main/java/org/apache/druid/segment/serde/DictionaryEncodedColumnPartSerde.java
@@ -143,13 +143,21 @@ public class DictionaryEncodedColumnPartSerde implements ColumnPartSerde
public static class SerializerBuilder
{
- private VERSION version = null;
private int flags = STARTING_FLAGS;
+
+ @Nullable
+ private VERSION version = null;
+ @Nullable
private GenericIndexedWriter<String> dictionaryWriter = null;
+ @Nullable
private ColumnarIntsSerializer valueWriter = null;
+ @Nullable
private BitmapSerdeFactory bitmapSerdeFactory = null;
+ @Nullable
private GenericIndexedWriter<ImmutableBitmap> bitmapIndexWriter = null;
+ @Nullable
private ByteBufferWriter<ImmutableRTree> spatialIndexWriter = null;
+ @Nullable
private ByteOrder byteOrder = null;
public SerializerBuilder withDictionary(GenericIndexedWriter<String> dictionaryWriter)
diff --git a/processing/src/main/java/org/apache/druid/segment/serde/DoubleNumericColumnPartSerde.java b/processing/src/main/java/org/apache/druid/segment/serde/DoubleNumericColumnPartSerde.java
index 82ebfc2..5a08e91 100644
--- a/processing/src/main/java/org/apache/druid/segment/serde/DoubleNumericColumnPartSerde.java
+++ b/processing/src/main/java/org/apache/druid/segment/serde/DoubleNumericColumnPartSerde.java
@@ -23,14 +23,12 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Supplier;
import org.apache.druid.segment.IndexIO;
-import org.apache.druid.segment.column.ColumnBuilder;
-import org.apache.druid.segment.column.ColumnConfig;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.data.ColumnarDoubles;
import org.apache.druid.segment.data.CompressedColumnarDoublesSuppliers;
import javax.annotation.Nullable;
-import java.nio.ByteBuffer;
+
import java.nio.ByteOrder;
public class DoubleNumericColumnPartSerde implements ColumnPartSerde
@@ -66,7 +64,9 @@ public class DoubleNumericColumnPartSerde implements ColumnPartSerde
public static class SerializerBuilder
{
+ @Nullable
private ByteOrder byteOrder = null;
+ @Nullable
private Serializer delegate = null;
public SerializerBuilder withByteOrder(final ByteOrder byteOrder)
@@ -97,24 +97,19 @@ public class DoubleNumericColumnPartSerde implements ColumnPartSerde
@Override
public Deserializer getDeserializer()
{
- return new Deserializer()
- {
- @Override
- public void read(ByteBuffer buffer, ColumnBuilder builder, ColumnConfig columnConfig)
- {
- final Supplier<ColumnarDoubles> column = CompressedColumnarDoublesSuppliers.fromByteBuffer(
- buffer,
- byteOrder
- );
- DoubleNumericColumnSupplier columnSupplier = new DoubleNumericColumnSupplier(
- column,
- IndexIO.LEGACY_FACTORY.getBitmapFactory().makeEmptyImmutableBitmap()
- );
- builder.setType(ValueType.DOUBLE)
- .setHasMultipleValues(false)
- .setNumericColumnSupplier(columnSupplier);
-
- }
+ return (buffer, builder, columnConfig) -> {
+ final Supplier<ColumnarDoubles> column = CompressedColumnarDoublesSuppliers.fromByteBuffer(
+ buffer,
+ byteOrder
+ );
+ DoubleNumericColumnSupplier columnSupplier = new DoubleNumericColumnSupplier(
+ column,
+ IndexIO.LEGACY_FACTORY.getBitmapFactory().makeEmptyImmutableBitmap()
+ );
+ builder.setType(ValueType.DOUBLE)
+ .setHasMultipleValues(false)
+ .setNumericColumnSupplier(columnSupplier);
+
};
}
}
diff --git a/processing/src/main/java/org/apache/druid/segment/serde/DoubleNumericColumnPartSerdeV2.java b/processing/src/main/java/org/apache/druid/segment/serde/DoubleNumericColumnPartSerdeV2.java
index b822dbf..2262184 100644
--- a/processing/src/main/java/org/apache/druid/segment/serde/DoubleNumericColumnPartSerdeV2.java
+++ b/processing/src/main/java/org/apache/druid/segment/serde/DoubleNumericColumnPartSerdeV2.java
@@ -54,7 +54,7 @@ public class DoubleNumericColumnPartSerdeV2 implements ColumnPartSerde
private final ByteOrder byteOrder;
@Nullable
- private Serializer serializer;
+ private final Serializer serializer;
private final BitmapSerdeFactory bitmapSerdeFactory;
public DoubleNumericColumnPartSerdeV2(
@@ -87,8 +87,11 @@ public class DoubleNumericColumnPartSerdeV2 implements ColumnPartSerde
public static class SerializerBuilder
{
+ @Nullable
private ByteOrder byteOrder = null;
+ @Nullable
private Serializer delegate = null;
+ @Nullable
private BitmapSerdeFactory bitmapSerdeFactory = null;
public SerializerBuilder withByteOrder(final ByteOrder byteOrder)
diff --git a/processing/src/main/java/org/apache/druid/segment/serde/FloatNumericColumnPartSerde.java b/processing/src/main/java/org/apache/druid/segment/serde/FloatNumericColumnPartSerde.java
index 46cc40b..02ac221 100644
--- a/processing/src/main/java/org/apache/druid/segment/serde/FloatNumericColumnPartSerde.java
+++ b/processing/src/main/java/org/apache/druid/segment/serde/FloatNumericColumnPartSerde.java
@@ -66,7 +66,9 @@ public class FloatNumericColumnPartSerde implements ColumnPartSerde
public static class SerializerBuilder
{
+ @Nullable
private ByteOrder byteOrder = null;
+ @Nullable
private Serializer delegate = null;
public SerializerBuilder withByteOrder(final ByteOrder byteOrder)
diff --git a/processing/src/main/java/org/apache/druid/segment/serde/FloatNumericColumnPartSerdeV2.java b/processing/src/main/java/org/apache/druid/segment/serde/FloatNumericColumnPartSerdeV2.java
index ccc748a..b3c71db 100644
--- a/processing/src/main/java/org/apache/druid/segment/serde/FloatNumericColumnPartSerdeV2.java
+++ b/processing/src/main/java/org/apache/druid/segment/serde/FloatNumericColumnPartSerdeV2.java
@@ -52,7 +52,7 @@ public class FloatNumericColumnPartSerdeV2 implements ColumnPartSerde
private final ByteOrder byteOrder;
@Nullable
- private Serializer serializer;
+ private final Serializer serializer;
private final BitmapSerdeFactory bitmapSerdeFactory;
private FloatNumericColumnPartSerdeV2(
@@ -85,8 +85,11 @@ public class FloatNumericColumnPartSerdeV2 implements ColumnPartSerde
public static class SerializerBuilder
{
+ @Nullable
private ByteOrder byteOrder = null;
+ @Nullable
private Serializer delegate = null;
+ @Nullable
private BitmapSerdeFactory bitmapSerdeFactory = null;
public SerializerBuilder withByteOrder(final ByteOrder byteOrder)
diff --git a/processing/src/main/java/org/apache/druid/segment/serde/LongNumericColumnPartSerde.java b/processing/src/main/java/org/apache/druid/segment/serde/LongNumericColumnPartSerde.java
index 2dda2eb..3884875 100644
--- a/processing/src/main/java/org/apache/druid/segment/serde/LongNumericColumnPartSerde.java
+++ b/processing/src/main/java/org/apache/druid/segment/serde/LongNumericColumnPartSerde.java
@@ -22,13 +22,11 @@ package org.apache.druid.segment.serde;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.segment.IndexIO;
-import org.apache.druid.segment.column.ColumnBuilder;
-import org.apache.druid.segment.column.ColumnConfig;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.data.CompressedColumnarLongsSupplier;
import javax.annotation.Nullable;
-import java.nio.ByteBuffer;
+
import java.nio.ByteOrder;
/**
@@ -66,7 +64,9 @@ public class LongNumericColumnPartSerde implements ColumnPartSerde
public static class SerializerBuilder
{
+ @Nullable
private ByteOrder byteOrder = null;
+ @Nullable
private Serializer delegate = null;
public SerializerBuilder withByteOrder(final ByteOrder byteOrder)
@@ -97,23 +97,18 @@ public class LongNumericColumnPartSerde implements ColumnPartSerde
@Override
public Deserializer getDeserializer()
{
- return new Deserializer()
- {
- @Override
- public void read(ByteBuffer buffer, ColumnBuilder builder, ColumnConfig columnConfig)
- {
- final CompressedColumnarLongsSupplier column = CompressedColumnarLongsSupplier.fromByteBuffer(
- buffer,
- byteOrder
- );
- LongNumericColumnSupplier columnSupplier = new LongNumericColumnSupplier(
- column,
- IndexIO.LEGACY_FACTORY.getBitmapFactory().makeEmptyImmutableBitmap()
- );
- builder.setType(ValueType.LONG)
- .setHasMultipleValues(false)
- .setNumericColumnSupplier(columnSupplier);
- }
+ return (buffer, builder, columnConfig) -> {
+ final CompressedColumnarLongsSupplier column = CompressedColumnarLongsSupplier.fromByteBuffer(
+ buffer,
+ byteOrder
+ );
+ LongNumericColumnSupplier columnSupplier = new LongNumericColumnSupplier(
+ column,
+ IndexIO.LEGACY_FACTORY.getBitmapFactory().makeEmptyImmutableBitmap()
+ );
+ builder.setType(ValueType.LONG)
+ .setHasMultipleValues(false)
+ .setNumericColumnSupplier(columnSupplier);
};
}
}
diff --git a/processing/src/main/java/org/apache/druid/segment/serde/LongNumericColumnPartSerdeV2.java b/processing/src/main/java/org/apache/druid/segment/serde/LongNumericColumnPartSerdeV2.java
index ad27d81..7a46c51 100644
--- a/processing/src/main/java/org/apache/druid/segment/serde/LongNumericColumnPartSerdeV2.java
+++ b/processing/src/main/java/org/apache/druid/segment/serde/LongNumericColumnPartSerdeV2.java
@@ -50,14 +50,16 @@ public class LongNumericColumnPartSerdeV2 implements ColumnPartSerde
);
}
+ @Nullable
+ private final Serializer serializer;
+ @Nullable
private final ByteOrder byteOrder;
@Nullable
- private Serializer serializer;
private final BitmapSerdeFactory bitmapSerdeFactory;
private LongNumericColumnPartSerdeV2(
- ByteOrder byteOrder,
- BitmapSerdeFactory bitmapSerdeFactory,
+ @Nullable ByteOrder byteOrder,
+ @Nullable BitmapSerdeFactory bitmapSerdeFactory,
@Nullable Serializer serializer
)
{
@@ -85,8 +87,11 @@ public class LongNumericColumnPartSerdeV2 implements ColumnPartSerde
public static class SerializerBuilder
{
+ @Nullable
private ByteOrder byteOrder = null;
+ @Nullable
private Serializer delegate = null;
+ @Nullable
private BitmapSerdeFactory bitmapSerdeFactory = null;
public SerializerBuilder withByteOrder(final ByteOrder byteOrder)
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/firehose/EventReceiverFirehoseFactory.java b/server/src/main/java/org/apache/druid/segment/realtime/firehose/EventReceiverFirehoseFactory.java
index d8c1ef9..6842f8a 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/firehose/EventReceiverFirehoseFactory.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/firehose/EventReceiverFirehoseFactory.java
@@ -104,7 +104,7 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory<InputRowPar
* {@link EventReceiverFirehose} that may change in the future.
*/
private final long maxIdleTimeMillis;
- private final @Nullable ChatHandlerProvider chatHandlerProvider;
+ private final ChatHandlerProvider chatHandlerProvider;
private final ObjectMapper jsonMapper;
private final ObjectMapper smileMapper;
private final EventReceiverFirehoseRegister eventReceiverFirehoseRegister;
@@ -225,6 +225,7 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory<InputRowPar
* This field and {@link #rowsRunOut} are not volatile because they are accessed only from {@link #hasMore()} and
* {@link #nextRow()} methods that are called from a single thread according to {@link Firehose} spec.
*/
+ @Nullable
private InputRow nextRow = null;
private boolean rowsRunOut = false;
@@ -241,7 +242,9 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory<InputRowPar
* If they were not volatile, NPE would be possible in {@link #delayedCloseExecutor}. See
* https://shipilev.net/blog/2016/close-encounters-of-jmm-kind/#wishful-hb-actual for explanations.
*/
+ @Nullable
private volatile Long idleCloseTimeNs = null;
+ @Nullable
private volatile Long requestedShutdownTimeNs = null;
EventReceiverFirehose(InputRowParser<Map<String, Object>> parser)
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/firehose/PredicateFirehose.java b/server/src/main/java/org/apache/druid/segment/realtime/firehose/PredicateFirehose.java
index b8c6b85..446efa3 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/firehose/PredicateFirehose.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/firehose/PredicateFirehose.java
@@ -40,6 +40,7 @@ public class PredicateFirehose implements Firehose
private final Firehose firehose;
private final Predicate<InputRow> predicate;
+ @Nullable
private InputRow savedInputRow = null;
public PredicateFirehose(Firehose firehose, Predicate<InputRow> predicate)
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/firehose/SqlFirehoseFactory.java b/server/src/main/java/org/apache/druid/segment/realtime/firehose/SqlFirehoseFactory.java
index 43738f8..866be52 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/firehose/SqlFirehoseFactory.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/firehose/SqlFirehoseFactory.java
@@ -35,6 +35,8 @@ import org.skife.jdbi.v2.exceptions.CallbackFailedException;
import org.skife.jdbi.v2.exceptions.ResultSetException;
import org.skife.jdbi.v2.exceptions.StatementException;
+import javax.annotation.Nullable;
+
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
@@ -51,6 +53,7 @@ public class SqlFirehoseFactory extends PrefetchSqlFirehoseFactory<String>
{
@JsonProperty
private final List<String> sqls;
+ @Nullable
@JsonProperty
private final MetadataStorageConnectorConfig connectorConfig;
private final ObjectMapper objectMapper;
diff --git a/server/src/main/java/org/apache/druid/server/coordination/BatchDataSegmentAnnouncer.java b/server/src/main/java/org/apache/druid/server/coordination/BatchDataSegmentAnnouncer.java
index fd53a1c..5c521bb 100644
--- a/server/src/main/java/org/apache/druid/server/coordination/BatchDataSegmentAnnouncer.java
+++ b/server/src/main/java/org/apache/druid/server/coordination/BatchDataSegmentAnnouncer.java
@@ -69,7 +69,8 @@ public class BatchDataSegmentAnnouncer implements DataSegmentAnnouncer
private final ConcurrentMap<DataSegment, SegmentZNode> segmentLookup = new ConcurrentHashMap<>();
private final Function<DataSegment, DataSegment> segmentTransformer;
- private final ChangeRequestHistory<DataSegmentChangeRequest> changes = new ChangeRequestHistory();
+ private final ChangeRequestHistory<DataSegmentChangeRequest> changes = new ChangeRequestHistory<>();
+ @Nullable
private final SegmentZNode dummyZnode;
@Inject
@@ -87,20 +88,15 @@ public class BatchDataSegmentAnnouncer implements DataSegmentAnnouncer
this.server = server;
this.liveSegmentLocation = ZKPaths.makePath(zkPaths.getLiveSegmentsPath(), server.getName());
- segmentTransformer = new Function<DataSegment, DataSegment>()
- {
- @Override
- public DataSegment apply(DataSegment input)
- {
- DataSegment rv = input;
- if (config.isSkipDimensionsAndMetrics()) {
- rv = rv.withDimensions(null).withMetrics(null);
- }
- if (config.isSkipLoadSpec()) {
- rv = rv.withLoadSpec(null);
- }
- return rv;
+ segmentTransformer = input -> {
+ DataSegment rv = input;
+ if (config.isSkipDimensionsAndMetrics()) {
+ rv = rv.withDimensions(null).withMetrics(null);
+ }
+ if (config.isSkipLoadSpec()) {
+ rv = rv.withLoadSpec(null);
}
+ return rv;
};
if (this.config.isSkipSegmentAnnouncementOnZk()) {
diff --git a/server/src/main/java/org/apache/druid/server/coordination/BatchDataSegmentAnnouncerProvider.java b/server/src/main/java/org/apache/druid/server/coordination/BatchDataSegmentAnnouncerProvider.java
index b4acbf7..b521594 100644
--- a/server/src/main/java/org/apache/druid/server/coordination/BatchDataSegmentAnnouncerProvider.java
+++ b/server/src/main/java/org/apache/druid/server/coordination/BatchDataSegmentAnnouncerProvider.java
@@ -21,15 +21,15 @@ package org.apache.druid.server.coordination;
import com.fasterxml.jackson.annotation.JacksonInject;
-import javax.validation.constraints.NotNull;
+import javax.annotation.Nullable;
/**
*/
public class BatchDataSegmentAnnouncerProvider implements DataSegmentAnnouncerProvider
{
@JacksonInject
- @NotNull
- private BatchDataSegmentAnnouncer batchAnnouncer = null;
+ @Nullable
+ private final BatchDataSegmentAnnouncer batchAnnouncer = null;
@Override
public DataSegmentAnnouncer get()
diff --git a/server/src/main/java/org/apache/druid/server/coordination/ChangeRequestHttpSyncer.java b/server/src/main/java/org/apache/druid/server/coordination/ChangeRequestHttpSyncer.java
index 8f38f9f..fc466d1 100644
--- a/server/src/main/java/org/apache/druid/server/coordination/ChangeRequestHttpSyncer.java
+++ b/server/src/main/java/org/apache/druid/server/coordination/ChangeRequestHttpSyncer.java
@@ -39,6 +39,7 @@ import org.jboss.netty.handler.codec.http.HttpHeaders;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.joda.time.Duration;
+import javax.annotation.Nullable;
import javax.servlet.http.HttpServletResponse;
import java.io.InputStream;
import java.net.URL;
@@ -84,12 +85,14 @@ public class ChangeRequestHttpSyncer<T>
private final LifecycleLock startStopLock = new LifecycleLock();
private final String logIdentity;
- private ChangeRequestHistory.Counter counter = null;
private long unstableStartTime = -1;
private int consecutiveFailedAttemptCount = 0;
private long lastSuccessfulSyncTime = 0;
private long lastSyncTime = 0;
+ @Nullable
+ private ChangeRequestHistory.Counter counter = null;
+
public ChangeRequestHttpSyncer(
ObjectMapper smileMapper,
HttpClient httpClient,
diff --git a/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java b/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java
index c2a58ba..cb08182 100644
--- a/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java
+++ b/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java
@@ -44,6 +44,8 @@ import org.apache.druid.segment.loading.SegmentLoadingException;
import org.apache.druid.server.SegmentManager;
import org.apache.druid.timeline.DataSegment;
+import javax.annotation.Nullable;
+
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
@@ -240,14 +242,7 @@ public class SegmentLoadDropHandler implements DataSegmentChangeHandler
addSegments(
cachedSegments,
- new DataSegmentChangeCallback()
- {
- @Override
- public void execute()
- {
- log.info("Cache load took %,d ms", System.currentTimeMillis() - start);
- }
- }
+ () -> log.info("Cache load took %,d ms", System.currentTimeMillis() - start)
);
}
@@ -348,35 +343,30 @@ public class SegmentLoadDropHandler implements DataSegmentChangeHandler
final CopyOnWriteArrayList<DataSegment> failedSegments = new CopyOnWriteArrayList<>();
for (final DataSegment segment : segments) {
loadingExecutor.submit(
- new Runnable()
- {
- @Override
- public void run()
- {
+ () -> {
+ try {
+ log.info(
+ "Loading segment[%d/%d][%s]",
+ counter.incrementAndGet(),
+ numSegments,
+ segment.getId()
+ );
+ loadSegment(segment, callback);
try {
- log.info(
- "Loading segment[%d/%d][%s]",
- counter.incrementAndGet(),
- numSegments,
- segment.getId()
- );
- loadSegment(segment, callback);
- try {
- backgroundSegmentAnnouncer.announceSegment(segment);
- }
- catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new SegmentLoadingException(e, "Loading Interrupted");
- }
+ backgroundSegmentAnnouncer.announceSegment(segment);
}
- catch (SegmentLoadingException e) {
- log.error(e, "[%s] failed to load", segment.getId());
- failedSegments.add(segment);
- }
- finally {
- latch.countDown();
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new SegmentLoadingException(e, "Loading Interrupted");
}
}
+ catch (SegmentLoadingException e) {
+ log.error(e, "[%s] failed to load", segment.getId());
+ failedSegments.add(segment);
+ }
+ finally {
+ latch.countDown();
+ }
}
);
}
@@ -427,28 +417,23 @@ public class SegmentLoadDropHandler implements DataSegmentChangeHandler
announcer.unannounceSegment(segment);
segmentsToDelete.add(segment);
- Runnable runnable = new Runnable()
- {
- @Override
- public void run()
- {
- try {
- synchronized (segmentDeleteLock) {
- if (segmentsToDelete.remove(segment)) {
- segmentManager.dropSegment(segment);
-
- File segmentInfoCacheFile = new File(config.getInfoDir(), segment.getId().toString());
- if (!segmentInfoCacheFile.delete()) {
- log.warn("Unable to delete segmentInfoCacheFile[%s]", segmentInfoCacheFile);
- }
+ Runnable runnable = () -> {
+ try {
+ synchronized (segmentDeleteLock) {
+ if (segmentsToDelete.remove(segment)) {
+ segmentManager.dropSegment(segment);
+
+ File segmentInfoCacheFile = new File(config.getInfoDir(), segment.getId().toString());
+ if (!segmentInfoCacheFile.delete()) {
+ log.warn("Unable to delete segmentInfoCacheFile[%s]", segmentInfoCacheFile);
}
}
}
- catch (Exception e) {
- log.makeAlert(e, "Failed to remove segment! Possible resource leak!")
- .addData("segment", segment)
- .emit();
- }
+ }
+ catch (Exception e) {
+ log.makeAlert(e, "Failed to remove segment! Possible resource leak!")
+ .addData("segment", segment)
+ .emit();
}
};
@@ -543,7 +528,7 @@ public class SegmentLoadDropHandler implements DataSegmentChangeHandler
);
}
},
- () -> resolveWaitingFutures()
+ this::resolveWaitingFutures
);
}
return requestStatuses.getIfPresent(changeRequest);
@@ -588,7 +573,9 @@ public class SegmentLoadDropHandler implements DataSegmentChangeHandler
private final Object lock = new Object();
private volatile boolean finished = false;
+ @Nullable
private volatile ScheduledFuture startedAnnouncing = null;
+ @Nullable
private volatile ScheduledFuture nextAnnoucement = null;
public BackgroundSegmentAnnouncer(
@@ -755,6 +742,7 @@ public class SegmentLoadDropHandler implements DataSegmentChangeHandler
}
private final STATE state;
+ @Nullable
private final String failureCause;
public static final Status SUCCESS = new Status(STATE.SUCCESS, null);
@@ -763,7 +751,7 @@ public class SegmentLoadDropHandler implements DataSegmentChangeHandler
@JsonCreator
Status(
@JsonProperty("state") STATE state,
- @JsonProperty("failureCause") String failureCause
+ @JsonProperty("failureCause") @Nullable String failureCause
)
{
Preconditions.checkNotNull(state, "state must be non-null");
@@ -782,6 +770,7 @@ public class SegmentLoadDropHandler implements DataSegmentChangeHandler
return state;
}
+ @Nullable
@JsonProperty
public String getFailureCause()
{
diff --git a/server/src/main/java/org/apache/druid/server/coordination/ZkCoordinator.java b/server/src/main/java/org/apache/druid/server/coordination/ZkCoordinator.java
index ca56b10..734de20 100644
--- a/server/src/main/java/org/apache/druid/server/coordination/ZkCoordinator.java
+++ b/server/src/main/java/org/apache/druid/server/coordination/ZkCoordinator.java
@@ -25,8 +25,6 @@ import com.google.inject.Inject;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
-import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
-import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.utils.ZKPaths;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
@@ -35,6 +33,8 @@ import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.segment.loading.SegmentLoaderConfig;
import org.apache.druid.server.initialization.ZkPathsConfig;
+import javax.annotation.Nullable;
+
import java.io.IOException;
import java.util.concurrent.ExecutorService;
@@ -54,6 +54,7 @@ public class ZkCoordinator
private final DruidServerMetadata me;
private final CuratorFramework curator;
+ @Nullable
private volatile PathChildrenCache loadQueueCache;
private volatile boolean started = false;
private final ExecutorService segmentLoadUnloadService;
@@ -107,22 +108,17 @@ public class ZkCoordinator
curator.newNamespaceAwareEnsurePath(liveSegmentsLocation).ensure(curator.getZookeeperClient());
loadQueueCache.getListenable().addListener(
- new PathChildrenCacheListener()
- {
- @Override
- public void childEvent(CuratorFramework client, PathChildrenCacheEvent event)
- {
- final ChildData child = event.getData();
- switch (event.getType()) {
- case CHILD_ADDED:
- childAdded(child);
- break;
- case CHILD_REMOVED:
- log.info("zNode[%s] was removed", event.getData().getPath());
- break;
- default:
- log.info("Ignoring event[%s]", event);
- }
+ (client, event) -> {
+ final ChildData child = event.getData();
+ switch (event.getType()) {
+ case CHILD_ADDED:
+ childAdded(child);
+ break;
+ case CHILD_REMOVED:
+ log.info("zNode[%s] was removed", event.getData().getPath());
+ break;
+ default:
+ log.info("Ignoring event[%s]", event);
}
}
@@ -151,25 +147,20 @@ public class ZkCoordinator
finalRequest.go(
dataSegmentChangeHandler,
- new DataSegmentChangeCallback()
- {
- @Override
- public void execute()
- {
+ () -> {
+ try {
+ curator.delete().guaranteed().forPath(path);
+ log.info("Completed request [%s]", finalRequest.asString());
+ }
+ catch (Exception e) {
try {
curator.delete().guaranteed().forPath(path);
- log.info("Completed request [%s]", finalRequest.asString());
}
- catch (Exception e) {
- try {
- curator.delete().guaranteed().forPath(path);
- }
- catch (Exception e1) {
- log.error(e1, "Failed to delete zNode[%s], but ignoring exception.", path);
- }
- log.error(e, "Exception while removing zNode[%s]", path);
- throw new RuntimeException(e);
+ catch (Exception e1) {
+ log.error(e1, "Failed to delete zNode[%s], but ignoring exception.", path);
}
+ log.error(e, "Exception while removing zNode[%s]", path);
+ throw new RuntimeException(e);
}
}
);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org