You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by as...@apache.org on 2019/07/21 13:23:56 UTC

[incubator-druid] branch master updated: Spotbugs: NP_STORE_INTO_NONNULL_FIELD (#8021)

This is an automated email from the ASF dual-hosted git repository.

asdf2014 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/master by this push:
     new e1a7457  Spotbugs: NP_STORE_INTO_NONNULL_FIELD (#8021)
e1a7457 is described below

commit e1a745717e0b008ca76a0a27e96d3097fc48a8f9
Author: Fokko Driesprong <fo...@apache.org>
AuthorDate: Sun Jul 21 15:23:47 2019 +0200

    Spotbugs: NP_STORE_INTO_NONNULL_FIELD (#8021)
---
 codestyle/spotbugs-exclude.xml                     |   1 -
 .../quantiles/DoublesSketchBuildAggregator.java    |   3 +
 .../quantiles/DoublesSketchMergeAggregator.java    |   3 +
 .../datasketches/theta/SketchAggregator.java       |   4 +
 .../datasketches/theta/SketchHolder.java           |   6 +-
 .../tuple/ArrayOfDoublesSketchBuildAggregator.java |   4 +
 .../ArrayOfDoublesSketchBuildBufferAggregator.java |   3 +
 .../tuple/ArrayOfDoublesSketchMergeAggregator.java |   3 +
 .../druid/query/groupby/GroupByQueryEngine.java    |   7 +-
 .../groupby/epinephelinae/BufferArrayGrouper.java  |   4 +
 .../groupby/epinephelinae/BufferHashGrouper.java   |  41 +-
 .../epinephelinae/GroupByQueryEngineV2.java        |   1 +
 .../epinephelinae/RowBasedGrouperHelper.java       | 559 ++++++++-------------
 .../org/apache/druid/segment/MetricHolder.java     |   4 +
 .../segment/SingleScanTimeDimensionSelector.java   |   4 +-
 .../druid/segment/StringDimensionIndexer.java      |   6 +-
 .../druid/segment/StringDimensionMergerV9.java     |  42 +-
 .../apache/druid/segment/column/ColumnBuilder.java |   9 +-
 .../segment/column/ColumnCapabilitiesImpl.java     |   4 +
 .../druid/segment/column/ColumnDescriptor.java     |   4 +
 .../data/BlockLayoutColumnarDoublesSerializer.java |   3 +
 .../data/BlockLayoutColumnarFloatsSerializer.java  |   3 +
 .../data/BlockLayoutColumnarLongsSerializer.java   |   3 +
 .../druid/segment/data/ByteBufferWriter.java       |   4 +
 .../data/CompressedColumnarIntsSerializer.java     |   6 +-
 .../CompressedVSizeColumnarIntsSerializer.java     |   5 +-
 .../apache/druid/segment/data/GenericIndexed.java  |   1 +
 .../druid/segment/data/GenericIndexedWriter.java   |   3 +
 .../data/IntermediateColumnarLongsSerializer.java  |   6 +-
 .../segment/data/LongsLongEncodingWriter.java      |   4 +
 .../segment/data/VSizeColumnarIntsSerializer.java  |   6 +-
 .../data/VSizeColumnarMultiIntsSerializer.java     |   4 +
 .../apache/druid/segment/data/VSizeLongSerde.java  |   9 +-
 .../segment/incremental/IncrementalIndex.java      |  88 ++--
 .../incremental/IncrementalIndexAdapter.java       |   1 +
 .../incremental/IncrementalIndexRowHolder.java     |   3 +
 .../incremental/OffheapIncrementalIndex.java       |   5 +
 .../incremental/OnheapIncrementalIndex.java        |   4 +-
 .../segment/serde/ComplexColumnPartSerde.java      |  18 +-
 .../serde/DictionaryEncodedColumnPartSerde.java    |  10 +-
 .../serde/DoubleNumericColumnPartSerde.java        |  37 +-
 .../serde/DoubleNumericColumnPartSerdeV2.java      |   5 +-
 .../segment/serde/FloatNumericColumnPartSerde.java |   2 +
 .../serde/FloatNumericColumnPartSerdeV2.java       |   5 +-
 .../segment/serde/LongNumericColumnPartSerde.java  |  35 +-
 .../serde/LongNumericColumnPartSerdeV2.java        |  11 +-
 .../firehose/EventReceiverFirehoseFactory.java     |   5 +-
 .../realtime/firehose/PredicateFirehose.java       |   1 +
 .../realtime/firehose/SqlFirehoseFactory.java      |   3 +
 .../coordination/BatchDataSegmentAnnouncer.java    |  24 +-
 .../BatchDataSegmentAnnouncerProvider.java         |   6 +-
 .../coordination/ChangeRequestHttpSyncer.java      |   5 +-
 .../coordination/SegmentLoadDropHandler.java       |  97 ++--
 .../druid/server/coordination/ZkCoordinator.java   |  57 +--
 54 files changed, 588 insertions(+), 603 deletions(-)

diff --git a/codestyle/spotbugs-exclude.xml b/codestyle/spotbugs-exclude.xml
index e1fdbf7..44acdb8 100644
--- a/codestyle/spotbugs-exclude.xml
+++ b/codestyle/spotbugs-exclude.xml
@@ -70,7 +70,6 @@
     <Bug pattern="NP_NULL_PARAM_DEREF"/>
     <Bug pattern="NP_NULL_PARAM_DEREF_NONVIRTUAL"/>
     <Bug pattern="NP_PARAMETER_MUST_BE_NONNULL_BUT_MARKED_AS_NULLABLE"/>
-    <Bug pattern="NP_STORE_INTO_NONNULL_FIELD"/>
     <Bug pattern="NS_DANGEROUS_NON_SHORT_CIRCUIT"/>
     <Bug pattern="OBL_UNSATISFIED_OBLIGATION"/>
     <Bug pattern="OS_OPEN_STREAM"/>
diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchBuildAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchBuildAggregator.java
index 18f94a9..7e0ff89 100644
--- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchBuildAggregator.java
+++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchBuildAggregator.java
@@ -24,11 +24,14 @@ import com.yahoo.sketches.quantiles.UpdateDoublesSketch;
 import org.apache.druid.query.aggregation.Aggregator;
 import org.apache.druid.segment.ColumnValueSelector;
 
+import javax.annotation.Nullable;
+
 public class DoublesSketchBuildAggregator implements Aggregator
 {
 
   private final ColumnValueSelector<Double> valueSelector;
 
+  @Nullable
   private UpdateDoublesSketch sketch;
 
   public DoublesSketchBuildAggregator(final ColumnValueSelector<Double> valueSelector, final int size)
diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchMergeAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchMergeAggregator.java
index 4598048..8c19e33 100644
--- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchMergeAggregator.java
+++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchMergeAggregator.java
@@ -24,10 +24,13 @@ import com.yahoo.sketches.quantiles.DoublesUnion;
 import org.apache.druid.query.aggregation.Aggregator;
 import org.apache.druid.segment.ColumnValueSelector;
 
+import javax.annotation.Nullable;
+
 public class DoublesSketchMergeAggregator implements Aggregator
 {
 
   private final ColumnValueSelector selector;
+  @Nullable
   private DoublesUnion union;
 
   public DoublesSketchMergeAggregator(final ColumnValueSelector selector, final int k)
diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregator.java
index 075d8c7..ee1ddd2 100644
--- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregator.java
+++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregator.java
@@ -26,12 +26,16 @@ import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.query.aggregation.Aggregator;
 import org.apache.druid.segment.BaseObjectColumnValueSelector;
 
+import javax.annotation.Nullable;
+
 import java.util.List;
 
 public class SketchAggregator implements Aggregator
 {
   private final BaseObjectColumnValueSelector selector;
   private final int size;
+
+  @Nullable
   private Union union;
 
   public SketchAggregator(BaseObjectColumnValueSelector selector, int size)
diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchHolder.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchHolder.java
index 9ac9056..9ba8cda 100644
--- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchHolder.java
+++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchHolder.java
@@ -35,6 +35,8 @@ import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.StringUtils;
 
+import javax.annotation.Nullable;
+
 import java.util.Arrays;
 import java.util.Comparator;
 
@@ -49,7 +51,7 @@ public class SketchHolder
   );
 
   public static final Comparator<Object> COMPARATOR = Ordering.from(
-      new Comparator()
+      new Comparator<Object>()
       {
         @Override
         public int compare(Object o1, Object o2)
@@ -108,7 +110,9 @@ public class SketchHolder
 
   private final Object obj;
 
+  @Nullable
   private volatile Double cachedEstimate = null;
+  @Nullable
   private volatile Sketch cachedSketch = null;
 
   private SketchHolder(Object obj)
diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchBuildAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchBuildAggregator.java
index 11a3061..2781bc5 100644
--- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchBuildAggregator.java
+++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchBuildAggregator.java
@@ -26,6 +26,8 @@ import org.apache.druid.segment.BaseDoubleColumnValueSelector;
 import org.apache.druid.segment.DimensionSelector;
 import org.apache.druid.segment.data.IndexedInts;
 
+import javax.annotation.Nullable;
+
 import java.util.List;
 
 /**
@@ -38,7 +40,9 @@ public class ArrayOfDoublesSketchBuildAggregator implements Aggregator
 
   private final DimensionSelector keySelector;
   private final BaseDoubleColumnValueSelector[] valueSelectors;
+  @Nullable
   private double[] values; // not part of the state, but to reuse in aggregate() method
+  @Nullable
   private ArrayOfDoublesUpdatableSketch sketch;
 
   public ArrayOfDoublesSketchBuildAggregator(
diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchBuildBufferAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchBuildBufferAggregator.java
index 051d59f..1c79df8 100644
--- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchBuildBufferAggregator.java
+++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchBuildBufferAggregator.java
@@ -30,6 +30,8 @@ import org.apache.druid.segment.BaseDoubleColumnValueSelector;
 import org.apache.druid.segment.DimensionSelector;
 import org.apache.druid.segment.data.IndexedInts;
 
+import javax.annotation.Nullable;
+
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
 import java.util.List;
@@ -50,6 +52,7 @@ public class ArrayOfDoublesSketchBuildBufferAggregator implements BufferAggregat
   private final BaseDoubleColumnValueSelector[] valueSelectors;
   private final int nominalEntries;
   private final int maxIntermediateSize;
+  @Nullable
   private double[] values; // not part of the state, but to reuse in aggregate() method
   private final Striped<ReadWriteLock> stripedLock = Striped.readWriteLock(NUM_STRIPES);
 
diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchMergeAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchMergeAggregator.java
index 3f99ce9..db33fb9 100644
--- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchMergeAggregator.java
+++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchMergeAggregator.java
@@ -25,6 +25,8 @@ import com.yahoo.sketches.tuple.ArrayOfDoublesUnion;
 import org.apache.druid.query.aggregation.Aggregator;
 import org.apache.druid.segment.BaseObjectColumnValueSelector;
 
+import javax.annotation.Nullable;
+
 /**
  * This aggregator merges existing sketches.
  * The input column contains ArrayOfDoublesSketch.
@@ -34,6 +36,7 @@ public class ArrayOfDoublesSketchMergeAggregator implements Aggregator
 {
 
   private final BaseObjectColumnValueSelector<ArrayOfDoublesSketch> selector;
+  @Nullable
   private ArrayOfDoublesUnion union;
 
   public ArrayOfDoublesSketchMergeAggregator(
diff --git a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryEngine.java b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryEngine.java
index 1383b20..96a0b76 100644
--- a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryEngine.java
+++ b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryEngine.java
@@ -298,14 +298,13 @@ public class GroupByQueryEngine
     private final ByteBuffer metricsBuffer;
     private final int maxIntermediateRows;
 
-    private final List<DimensionSpec> dimensionSpecs;
     private final List<DimensionSelector> dimensions;
     private final ArrayList<String> dimNames;
-    private final List<AggregatorFactory> aggregatorSpecs;
     private final BufferAggregator[] aggregators;
     private final String[] metricNames;
     private final int[] sizesRequired;
 
+    @Nullable
     private List<ByteBuffer> unprocessedKeys;
     private Iterator<Row> delegate;
 
@@ -320,7 +319,7 @@ public class GroupByQueryEngine
 
       unprocessedKeys = null;
       delegate = Collections.emptyIterator();
-      dimensionSpecs = query.getDimensions();
+      List<DimensionSpec> dimensionSpecs = query.getDimensions();
       dimensions = Lists.newArrayListWithExpectedSize(dimensionSpecs.size());
       dimNames = Lists.newArrayListWithExpectedSize(dimensionSpecs.size());
 
@@ -340,7 +339,7 @@ public class GroupByQueryEngine
         dimNames.add(dimSpec.getOutputName());
       }
 
-      aggregatorSpecs = query.getAggregatorSpecs();
+      List<AggregatorFactory> aggregatorSpecs = query.getAggregatorSpecs();
       aggregators = new BufferAggregator[aggregatorSpecs.size()];
       metricNames = new String[aggregatorSpecs.size()];
       sizesRequired = new int[aggregatorSpecs.size()];
diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/BufferArrayGrouper.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/BufferArrayGrouper.java
index 7fd34bf..a0fc937 100644
--- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/BufferArrayGrouper.java
+++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/BufferArrayGrouper.java
@@ -28,6 +28,8 @@ import org.apache.druid.query.aggregation.AggregatorAdapters;
 import org.apache.druid.query.aggregation.AggregatorFactory;
 import org.apache.druid.query.groupby.epinephelinae.column.GroupByColumnSelectorStrategy;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
@@ -59,7 +61,9 @@ public class BufferArrayGrouper implements VectorGrouper, IntGrouper
   private ByteBuffer valBuffer;
 
   // Scratch objects used by aggregateVector(). Only set if initVectorized() is called.
+  @Nullable
   private int[] vAggregationPositions = null;
+  @Nullable
   private int[] vAggregationRows = null;
 
   static long requiredBufferCapacity(
diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/BufferHashGrouper.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/BufferHashGrouper.java
index 1799579..a695066 100644
--- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/BufferHashGrouper.java
+++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/BufferHashGrouper.java
@@ -28,10 +28,11 @@ import org.apache.druid.java.util.common.parsers.CloseableIterator;
 import org.apache.druid.query.aggregation.AggregatorAdapters;
 import org.apache.druid.query.aggregation.AggregatorFactory;
 
+import javax.annotation.Nullable;
+
 import java.nio.ByteBuffer;
 import java.util.AbstractList;
 import java.util.Collections;
-import java.util.Comparator;
 import java.util.List;
 import java.util.NoSuchElementException;
 import java.util.function.ToIntFunction;
@@ -42,7 +43,6 @@ public class BufferHashGrouper<KeyType> extends AbstractBufferHashGrouper<KeyTyp
   private static final int DEFAULT_INITIAL_BUCKETS = 1024;
   private static final float DEFAULT_MAX_LOAD_FACTOR = 0.7f;
 
-  private ByteBuffer buffer;
   private boolean initialized = false;
 
   // The BufferHashGrouper normally sorts by all fields of the grouping key with lexicographic ascending order.
@@ -54,16 +54,17 @@ public class BufferHashGrouper<KeyType> extends AbstractBufferHashGrouper<KeyTyp
   // to get a comparator that uses the ordering defined by the OrderByColumnSpec of a query.
   private final boolean useDefaultSorting;
 
-  // Track the offsets of used buckets using this list.
-  // When a new bucket is initialized by initializeNewBucketKey(), an offset is added to this list.
-  // When expanding the table, the list is reset() and filled with the new offsets of the copied buckets.
-  private ByteBuffer offsetListBuffer;
+  @Nullable
   private ByteBufferIntList offsetList;
 
   // Scratch objects used by aggregateVector(). Only set if initVectorized() is called.
+  @Nullable
   private ByteBuffer vKeyBuffer = null;
+  @Nullable
   private int[] vKeyHashCodes = null;
+  @Nullable
   private int[] vAggregationPositions = null;
+  @Nullable
   private int[] vAggregationRows = null;
 
   public BufferHashGrouper(
@@ -93,7 +94,7 @@ public class BufferHashGrouper<KeyType> extends AbstractBufferHashGrouper<KeyTyp
   public void init()
   {
     if (!initialized) {
-      this.buffer = bufferSupplier.get();
+      ByteBuffer buffer = bufferSupplier.get();
 
       int hashTableSize = ByteBufferHashTable.calculateTableArenaSizeWithPerBucketAdditionalSize(
           buffer.capacity(),
@@ -106,7 +107,10 @@ public class BufferHashGrouper<KeyType> extends AbstractBufferHashGrouper<KeyTyp
       hashTableBuffer.limit(hashTableSize);
       hashTableBuffer = hashTableBuffer.slice();
 
-      offsetListBuffer = buffer.duplicate();
+      // Track the offsets of used buckets using this list.
+      // When a new bucket is initialized by initializeNewBucketKey(), an offset is added to this list.
+      // When expanding the table, the list is reset() and filled with the new offsets of the copied buckets.
+      ByteBuffer offsetListBuffer = buffer.duplicate();
       offsetListBuffer.position(hashTableSize);
       offsetListBuffer.limit(buffer.capacity());
       offsetListBuffer = offsetListBuffer.slice();
@@ -315,19 +319,14 @@ public class BufferHashGrouper<KeyType> extends AbstractBufferHashGrouper<KeyTyp
       // Sort offsets in-place.
       Collections.sort(
           wrappedOffsets,
-          new Comparator<Integer>()
-          {
-            @Override
-            public int compare(Integer lhs, Integer rhs)
-            {
-              final ByteBuffer tableBuffer = hashTable.getTableBuffer();
-              return comparator.compare(
-                  tableBuffer,
-                  tableBuffer,
-                  lhs + HASH_SIZE,
-                  rhs + HASH_SIZE
-              );
-            }
+          (lhs, rhs) -> {
+            final ByteBuffer tableBuffer = hashTable.getTableBuffer();
+            return comparator.compare(
+                tableBuffer,
+                tableBuffer,
+                lhs + HASH_SIZE,
+                rhs + HASH_SIZE
+            );
           }
       );
 
diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java
index a39441d..836f36f 100644
--- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java
+++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java
@@ -363,6 +363,7 @@ public class GroupByQueryEngineV2
     protected final GroupByColumnSelectorPlus[] dims;
     protected final DateTime timestamp;
 
+    @Nullable
     protected CloseableGrouperIterator<KeyType, Row> delegate = null;
     protected final boolean allSingleValueDims;
 
diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java
index de5da62..2c75c35 100644
--- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java
+++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java
@@ -256,35 +256,27 @@ public class RowBasedGrouperHelper
         valueTypes
     );
 
-    final Accumulator<AggregateResult, Row> accumulator = new Accumulator<AggregateResult, Row>()
-    {
-      @Override
-      public AggregateResult accumulate(
-          final AggregateResult priorResult,
-          final Row row
-      )
-      {
-        BaseQuery.checkInterrupted();
+    final Accumulator<AggregateResult, Row> accumulator = (priorResult, row) -> {
+      BaseQuery.checkInterrupted();
 
-        if (priorResult != null && !priorResult.isOk()) {
-          // Pass-through error returns without doing more work.
-          return priorResult;
-        }
+      if (priorResult != null && !priorResult.isOk()) {
+        // Pass-through error returns without doing more work.
+        return priorResult;
+      }
 
-        if (!grouper.isInitialized()) {
-          grouper.init();
-        }
+      if (!grouper.isInitialized()) {
+        grouper.init();
+      }
 
-        columnSelectorRow.set(row);
+      columnSelectorRow.set(row);
 
-        final Comparable[] key = new Comparable[keySize];
-        valueExtractFn.apply(row, key);
+      final Comparable[] key = new Comparable[keySize];
+      valueExtractFn.apply(row, key);
 
-        final AggregateResult aggregateResult = grouper.aggregate(new RowBasedKey(key));
-        columnSelectorRow.set(null);
+      final AggregateResult aggregateResult = grouper.aggregate(new RowBasedKey(key));
+      columnSelectorRow.set(null);
 
-        return aggregateResult;
-      }
+      return aggregateResult;
     };
 
     return new Pair<>(grouper, accumulator);
@@ -302,33 +294,12 @@ public class RowBasedGrouperHelper
   {
     if (isInputRaw) {
       if (query.getGranularity() instanceof AllGranularity) {
-        return new TimestampExtractFunction()
-        {
-          @Override
-          public long apply(Row row)
-          {
-            return query.getIntervals().get(0).getStartMillis();
-          }
-        };
+        return row -> query.getIntervals().get(0).getStartMillis();
       } else {
-        return new TimestampExtractFunction()
-        {
-          @Override
-          public long apply(Row row)
-          {
-            return query.getGranularity().bucketStart(row.getTimestamp()).getMillis();
-          }
-        };
+        return row -> query.getGranularity().bucketStart(row.getTimestamp()).getMillis();
       }
     } else {
-      return new TimestampExtractFunction()
-      {
-        @Override
-        public long apply(Row row)
-        {
-          return row.getTimestampFromEpoch();
-        }
-      };
+      return Row::getTimestampFromEpoch;
     }
   }
 
@@ -358,60 +329,40 @@ public class RowBasedGrouperHelper
       );
 
       if (includeTimestamp) {
-        return new ValueExtractFunction()
-        {
-          @Override
-          public Comparable[] apply(Row row, Comparable[] key)
-          {
-            key[0] = timestampExtractFn.apply(row);
-            for (int i = 1; i < key.length; i++) {
-              final Comparable val = inputRawSuppliers[i - 1].get();
-              key[i] = valueConvertFns[i - 1].apply(val);
-            }
-            return key;
+        return (row, key) -> {
+          key[0] = timestampExtractFn.apply(row);
+          for (int i = 1; i < key.length; i++) {
+            final Comparable val = inputRawSuppliers[i - 1].get();
+            key[i] = valueConvertFns[i - 1].apply(val);
           }
+          return key;
         };
       } else {
-        return new ValueExtractFunction()
-        {
-          @Override
-          public Comparable[] apply(Row row, Comparable[] key)
-          {
-            for (int i = 0; i < key.length; i++) {
-              final Comparable val = inputRawSuppliers[i].get();
-              key[i] = valueConvertFns[i].apply(val);
-            }
-            return key;
+        return (row, key) -> {
+          for (int i = 0; i < key.length; i++) {
+            final Comparable val = inputRawSuppliers[i].get();
+            key[i] = valueConvertFns[i].apply(val);
           }
+          return key;
         };
       }
     } else {
       if (includeTimestamp) {
-        return new ValueExtractFunction()
-        {
-          @Override
-          public Comparable[] apply(Row row, Comparable[] key)
-          {
-            key[0] = timestampExtractFn.apply(row);
-            for (int i = 1; i < key.length; i++) {
-              final Comparable val = (Comparable) row.getRaw(query.getDimensions().get(i - 1).getOutputName());
-              key[i] = valueConvertFns[i - 1].apply(val);
-            }
-            return key;
+        return (row, key) -> {
+          key[0] = timestampExtractFn.apply(row);
+          for (int i = 1; i < key.length; i++) {
+            final Comparable val = (Comparable) row.getRaw(query.getDimensions().get(i - 1).getOutputName());
+            key[i] = valueConvertFns[i - 1].apply(val);
           }
+          return key;
         };
       } else {
-        return new ValueExtractFunction()
-        {
-          @Override
-          public Comparable[] apply(Row row, Comparable[] key)
-          {
-            for (int i = 0; i < key.length; i++) {
-              final Comparable val = (Comparable) row.getRaw(query.getDimensions().get(i).getOutputName());
-              key[i] = valueConvertFns[i].apply(val);
-            }
-            return key;
+        return (row, key) -> {
+          for (int i = 0; i < key.length; i++) {
+            final Comparable val = (Comparable) row.getRaw(query.getDimensions().get(i).getOutputName());
+            key[i] = valueConvertFns[i].apply(val);
           }
+          return key;
         };
       }
     }
@@ -437,56 +388,51 @@ public class RowBasedGrouperHelper
 
     return new CloseableGrouperIterator<>(
         grouper.iterator(true),
-        new Function<Grouper.Entry<RowBasedKey>, Row>()
-        {
-          @Override
-          public Row apply(Grouper.Entry<RowBasedKey> entry)
-          {
-            Map<String, Object> theMap = Maps.newLinkedHashMap();
-
-            // Get timestamp, maybe.
-            final DateTime timestamp;
-            final int dimStart;
-
-            if (includeTimestamp) {
-              timestamp = query.getGranularity().toDateTime(((long) (entry.getKey().getKey()[0])));
-              dimStart = 1;
-            } else {
-              timestamp = null;
-              dimStart = 0;
-            }
+        entry -> {
+          Map<String, Object> theMap = Maps.newLinkedHashMap();
 
-            // Add dimensions.
-            if (dimsToInclude == null) {
-              for (int i = dimStart; i < entry.getKey().getKey().length; i++) {
-                Object dimVal = entry.getKey().getKey()[i];
-                theMap.put(
-                    query.getDimensions().get(i - dimStart).getOutputName(),
-                    dimVal instanceof String ? NullHandling.emptyToNullIfNeeded((String) dimVal) : dimVal
-                );
-              }
-            } else {
-              Map<String, Object> dimensions = new HashMap<>();
-              for (int i = dimStart; i < entry.getKey().getKey().length; i++) {
-                Object dimVal = entry.getKey().getKey()[i];
-                dimensions.put(
-                    query.getDimensions().get(i - dimStart).getOutputName(),
-                    dimVal instanceof String ? NullHandling.emptyToNullIfNeeded((String) dimVal) : dimVal
-                );
-              }
-
-              for (String dimToInclude : dimsToInclude) {
-                theMap.put(dimToInclude, dimensions.get(dimToInclude));
-              }
+          // Get timestamp, maybe.
+          final DateTime timestamp;
+          final int dimStart;
+
+          if (includeTimestamp) {
+            timestamp = query.getGranularity().toDateTime(((long) (entry.getKey().getKey()[0])));
+            dimStart = 1;
+          } else {
+            timestamp = null;
+            dimStart = 0;
+          }
+
+          // Add dimensions.
+          if (dimsToInclude == null) {
+            for (int i = dimStart; i < entry.getKey().getKey().length; i++) {
+              Object dimVal = entry.getKey().getKey()[i];
+              theMap.put(
+                  query.getDimensions().get(i - dimStart).getOutputName(),
+                  dimVal instanceof String ? NullHandling.emptyToNullIfNeeded((String) dimVal) : dimVal
+              );
+            }
+          } else {
+            Map<String, Object> dimensions = new HashMap<>();
+            for (int i = dimStart; i < entry.getKey().getKey().length; i++) {
+              Object dimVal = entry.getKey().getKey()[i];
+              dimensions.put(
+                  query.getDimensions().get(i - dimStart).getOutputName(),
+                  dimVal instanceof String ? NullHandling.emptyToNullIfNeeded((String) dimVal) : dimVal
+              );
             }
 
-            // Add aggregations.
-            for (int i = 0; i < entry.getValues().length; i++) {
-              theMap.put(query.getAggregatorSpecs().get(i).getName(), entry.getValues()[i]);
+            for (String dimToInclude : dimsToInclude) {
+              theMap.put(dimToInclude, dimensions.get(dimToInclude));
             }
+          }
 
-            return new MapBasedRow(timestamp, theMap);
+          // Add aggregations.
+          for (int i = 0; i < entry.getValues().length; i++) {
+            theMap.put(query.getAggregatorSpecs().get(i).getName(), entry.getValues()[i]);
           }
+
+          return new MapBasedRow(timestamp, theMap);
         },
         closeable
     );
@@ -713,47 +659,30 @@ public class RowBasedGrouperHelper
 
       if (includeTimestamp) {
         if (sortByDimsFirst) {
-          return new Comparator<Grouper.Entry<RowBasedKey>>()
-          {
-            @Override
-            public int compare(Grouper.Entry<RowBasedKey> entry1, Grouper.Entry<RowBasedKey> entry2)
-            {
-              final int cmp = compareDimsInRows(entry1.getKey(), entry2.getKey(), 1);
-              if (cmp != 0) {
-                return cmp;
-              }
-
-              return Longs.compare((long) entry1.getKey().getKey()[0], (long) entry2.getKey().getKey()[0]);
+          return (entry1, entry2) -> {
+            final int cmp = compareDimsInRows(entry1.getKey(), entry2.getKey(), 1);
+            if (cmp != 0) {
+              return cmp;
             }
+
+            return Longs.compare((long) entry1.getKey().getKey()[0], (long) entry2.getKey().getKey()[0]);
           };
         } else {
-          return new Comparator<Grouper.Entry<RowBasedKey>>()
-          {
-            @Override
-            public int compare(Grouper.Entry<RowBasedKey> entry1, Grouper.Entry<RowBasedKey> entry2)
-            {
-              final int timeCompare = Longs.compare(
-                  (long) entry1.getKey().getKey()[0],
-                  (long) entry2.getKey().getKey()[0]
-              );
-
-              if (timeCompare != 0) {
-                return timeCompare;
-              }
+          return (entry1, entry2) -> {
+            final int timeCompare = Longs.compare(
+                (long) entry1.getKey().getKey()[0],
+                (long) entry2.getKey().getKey()[0]
+            );
 
-              return compareDimsInRows(entry1.getKey(), entry2.getKey(), 1);
+            if (timeCompare != 0) {
+              return timeCompare;
             }
+
+            return compareDimsInRows(entry1.getKey(), entry2.getKey(), 1);
           };
         }
       } else {
-        return new Comparator<Grouper.Entry<RowBasedKey>>()
-        {
-          @Override
-          public int compare(Grouper.Entry<RowBasedKey> entry1, Grouper.Entry<RowBasedKey> entry2)
-          {
-            return compareDimsInRows(entry1.getKey(), entry2.getKey(), 0);
-          }
-        };
+        return (entry1, entry2) -> compareDimsInRows(entry1.getKey(), entry2.getKey(), 0);
       }
     }
 
@@ -804,74 +733,57 @@ public class RowBasedGrouperHelper
 
       if (includeTimestamp) {
         if (sortByDimsFirst) {
-          return new Comparator<Grouper.Entry<RowBasedKey>>()
-          {
-            @Override
-            public int compare(Grouper.Entry<RowBasedKey> entry1, Grouper.Entry<RowBasedKey> entry2)
-            {
-              final int cmp = compareDimsInRowsWithAggs(
-                  entry1,
-                  entry2,
-                  1,
-                  needsReverses,
-                  aggFlags,
-                  fieldIndices,
-                  isNumericField,
-                  comparators
-              );
-              if (cmp != 0) {
-                return cmp;
-              }
-
-              return Longs.compare((long) entry1.getKey().getKey()[0], (long) entry2.getKey().getKey()[0]);
+          return (entry1, entry2) -> {
+            final int cmp = compareDimsInRowsWithAggs(
+                entry1,
+                entry2,
+                1,
+                needsReverses,
+                aggFlags,
+                fieldIndices,
+                isNumericField,
+                comparators
+            );
+            if (cmp != 0) {
+              return cmp;
             }
+
+            return Longs.compare((long) entry1.getKey().getKey()[0], (long) entry2.getKey().getKey()[0]);
           };
         } else {
-          return new Comparator<Grouper.Entry<RowBasedKey>>()
-          {
-            @Override
-            public int compare(Grouper.Entry<RowBasedKey> entry1, Grouper.Entry<RowBasedKey> entry2)
-            {
-              final int timeCompare = Longs.compare(
-                  (long) entry1.getKey().getKey()[0],
-                  (long) entry2.getKey().getKey()[0]
-              );
+          return (entry1, entry2) -> {
+            final int timeCompare = Longs.compare(
+                (long) entry1.getKey().getKey()[0],
+                (long) entry2.getKey().getKey()[0]
+            );
 
-              if (timeCompare != 0) {
-                return timeCompare;
-              }
-
-              return compareDimsInRowsWithAggs(
-                  entry1,
-                  entry2,
-                  1,
-                  needsReverses,
-                  aggFlags,
-                  fieldIndices,
-                  isNumericField,
-                  comparators
-              );
+            if (timeCompare != 0) {
+              return timeCompare;
             }
-          };
-        }
-      } else {
-        return new Comparator<Grouper.Entry<RowBasedKey>>()
-        {
-          @Override
-          public int compare(Grouper.Entry<RowBasedKey> entry1, Grouper.Entry<RowBasedKey> entry2)
-          {
+
             return compareDimsInRowsWithAggs(
                 entry1,
                 entry2,
-                0,
+                1,
                 needsReverses,
                 aggFlags,
                 fieldIndices,
                 isNumericField,
                 comparators
             );
-          }
-        };
+          };
+        }
+      } else {
+        return (entry1, entry2) -> compareDimsInRowsWithAggs(
+            entry1,
+            entry2,
+            0,
+            needsReverses,
+            aggFlags,
+            fieldIndices,
+            isNumericField,
+            comparators
+        );
       }
     }
 
@@ -981,6 +893,7 @@ public class RowBasedGrouperHelper
     // dictionary id -> rank of the sorted dictionary
     // This is initialized in the constructor and bufferComparator() with static dictionary and dynamic dictionary,
     // respectively.
+    @Nullable
     private int[] rankOfDictionaryIds = null;
 
     RowBasedKeySerde(
@@ -1118,68 +1031,53 @@ public class RowBasedGrouperHelper
 
       if (includeTimestamp) {
         if (sortByDimsFirst) {
-          return new Grouper.BufferComparator()
-          {
-            @Override
-            public int compare(ByteBuffer lhsBuffer, ByteBuffer rhsBuffer, int lhsPosition, int rhsPosition)
-            {
-              final int cmp = compareDimsInBuffersForNullFudgeTimestamp(
-                  serdeHelperComparators,
-                  lhsBuffer,
-                  rhsBuffer,
-                  lhsPosition,
-                  rhsPosition
-              );
-              if (cmp != 0) {
-                return cmp;
-              }
-
-              return Longs.compare(lhsBuffer.getLong(lhsPosition), rhsBuffer.getLong(rhsPosition));
+          return (lhsBuffer, rhsBuffer, lhsPosition, rhsPosition) -> {
+            final int cmp = compareDimsInBuffersForNullFudgeTimestamp(
+                serdeHelperComparators,
+                lhsBuffer,
+                rhsBuffer,
+                lhsPosition,
+                rhsPosition
+            );
+            if (cmp != 0) {
+              return cmp;
             }
+
+            return Longs.compare(lhsBuffer.getLong(lhsPosition), rhsBuffer.getLong(rhsPosition));
           };
         } else {
-          return new Grouper.BufferComparator()
-          {
-            @Override
-            public int compare(ByteBuffer lhsBuffer, ByteBuffer rhsBuffer, int lhsPosition, int rhsPosition)
-            {
-              final int timeCompare = Longs.compare(lhsBuffer.getLong(lhsPosition), rhsBuffer.getLong(rhsPosition));
-
-              if (timeCompare != 0) {
-                return timeCompare;
-              }
-
-              return compareDimsInBuffersForNullFudgeTimestamp(
-                  serdeHelperComparators,
-                  lhsBuffer,
-                  rhsBuffer,
-                  lhsPosition,
-                  rhsPosition
-              );
+          return (lhsBuffer, rhsBuffer, lhsPosition, rhsPosition) -> {
+            final int timeCompare = Longs.compare(lhsBuffer.getLong(lhsPosition), rhsBuffer.getLong(rhsPosition));
+
+            if (timeCompare != 0) {
+              return timeCompare;
             }
+
+            return compareDimsInBuffersForNullFudgeTimestamp(
+                serdeHelperComparators,
+                lhsBuffer,
+                rhsBuffer,
+                lhsPosition,
+                rhsPosition
+            );
           };
         }
       } else {
-        return new Grouper.BufferComparator()
-        {
-          @Override
-          public int compare(ByteBuffer lhsBuffer, ByteBuffer rhsBuffer, int lhsPosition, int rhsPosition)
-          {
-            for (int i = 0; i < dimCount; i++) {
-              final int cmp = serdeHelperComparators[i].compare(
-                  lhsBuffer,
-                  rhsBuffer,
-                  lhsPosition,
-                  rhsPosition
-              );
+        return (lhsBuffer, rhsBuffer, lhsPosition, rhsPosition) -> {
+          for (int i = 0; i < dimCount; i++) {
+            final int cmp = serdeHelperComparators[i].compare(
+                lhsBuffer,
+                rhsBuffer,
+                lhsPosition,
+                rhsPosition
+            );
 
-              if (cmp != 0) {
-                return cmp;
-              }
+            if (cmp != 0) {
+              return cmp;
             }
-
-            return 0;
           }
+
+          return 0;
         };
       }
     }
@@ -1246,84 +1144,69 @@ public class RowBasedGrouperHelper
 
       if (includeTimestamp) {
         if (sortByDimsFirst) {
-          return new Grouper.BufferComparator()
-          {
-            @Override
-            public int compare(ByteBuffer lhsBuffer, ByteBuffer rhsBuffer, int lhsPosition, int rhsPosition)
-            {
-              final int cmp = compareDimsInBuffersForNullFudgeTimestampForPushDown(
-                  adjustedSerdeHelperComparators,
-                  needsReverses,
-                  fieldCount,
-                  lhsBuffer,
-                  rhsBuffer,
-                  lhsPosition,
-                  rhsPosition
-              );
-              if (cmp != 0) {
-                return cmp;
-              }
-
-              return Longs.compare(lhsBuffer.getLong(lhsPosition), rhsBuffer.getLong(rhsPosition));
+          return (lhsBuffer, rhsBuffer, lhsPosition, rhsPosition) -> {
+            final int cmp = compareDimsInBuffersForNullFudgeTimestampForPushDown(
+                adjustedSerdeHelperComparators,
+                needsReverses,
+                fieldCount,
+                lhsBuffer,
+                rhsBuffer,
+                lhsPosition,
+                rhsPosition
+            );
+            if (cmp != 0) {
+              return cmp;
             }
+
+            return Longs.compare(lhsBuffer.getLong(lhsPosition), rhsBuffer.getLong(rhsPosition));
           };
         } else {
-          return new Grouper.BufferComparator()
-          {
-            @Override
-            public int compare(ByteBuffer lhsBuffer, ByteBuffer rhsBuffer, int lhsPosition, int rhsPosition)
-            {
-              final int timeCompare = Longs.compare(lhsBuffer.getLong(lhsPosition), rhsBuffer.getLong(rhsPosition));
-
-              if (timeCompare != 0) {
-                return timeCompare;
-              }
-
-              int cmp = compareDimsInBuffersForNullFudgeTimestampForPushDown(
-                  adjustedSerdeHelperComparators,
-                  needsReverses,
-                  fieldCount,
+          return (lhsBuffer, rhsBuffer, lhsPosition, rhsPosition) -> {
+            final int timeCompare = Longs.compare(lhsBuffer.getLong(lhsPosition), rhsBuffer.getLong(rhsPosition));
+
+            if (timeCompare != 0) {
+              return timeCompare;
+            }
+
+            int cmp = compareDimsInBuffersForNullFudgeTimestampForPushDown(
+                adjustedSerdeHelperComparators,
+                needsReverses,
+                fieldCount,
+                lhsBuffer,
+                rhsBuffer,
+                lhsPosition,
+                rhsPosition
+            );
+
+            return cmp;
+          };
+        }
+      } else {
+        return (lhsBuffer, rhsBuffer, lhsPosition, rhsPosition) -> {
+          for (int i = 0; i < fieldCount; i++) {
+            final int cmp;
+            if (needsReverses.get(i)) {
+              cmp = adjustedSerdeHelperComparators[i].compare(
+                  rhsBuffer,
+                  lhsBuffer,
+                  rhsPosition,
+                  lhsPosition
+              );
+            } else {
+              cmp = adjustedSerdeHelperComparators[i].compare(
                   lhsBuffer,
                   rhsBuffer,
                   lhsPosition,
                   rhsPosition
               );
+            }
 
+            if (cmp != 0) {
               return cmp;
             }
-          };
-        }
-      } else {
-        return new Grouper.BufferComparator()
-        {
-          @Override
-          public int compare(ByteBuffer lhsBuffer, ByteBuffer rhsBuffer, int lhsPosition, int rhsPosition)
-          {
-            for (int i = 0; i < fieldCount; i++) {
-              final int cmp;
-              if (needsReverses.get(i)) {
-                cmp = adjustedSerdeHelperComparators[i].compare(
-                    rhsBuffer,
-                    lhsBuffer,
-                    rhsPosition,
-                    lhsPosition
-                );
-              } else {
-                cmp = adjustedSerdeHelperComparators[i].compare(
-                    lhsBuffer,
-                    rhsBuffer,
-                    lhsPosition,
-                    rhsPosition
-                );
-              }
-
-              if (cmp != 0) {
-                return cmp;
-              }
-            }
-
-            return 0;
           }
+
+          return 0;
         };
       }
     }
diff --git a/processing/src/main/java/org/apache/druid/segment/MetricHolder.java b/processing/src/main/java/org/apache/druid/segment/MetricHolder.java
index 7705735..8f4ee20 100644
--- a/processing/src/main/java/org/apache/druid/segment/MetricHolder.java
+++ b/processing/src/main/java/org/apache/druid/segment/MetricHolder.java
@@ -26,6 +26,8 @@ import org.apache.druid.segment.data.GenericIndexed;
 import org.apache.druid.segment.serde.ComplexMetricSerde;
 import org.apache.druid.segment.serde.ComplexMetrics;
 
+import javax.annotation.Nullable;
+
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
 
@@ -96,7 +98,9 @@ public class MetricHolder
   private final String name;
   private final String typeName;
   private final MetricType type;
+  @Nullable
   CompressedColumnarFloatsSupplier floatType = null;
+  @Nullable
   GenericIndexed<?> complexType = null;
 
   private MetricHolder(
diff --git a/processing/src/main/java/org/apache/druid/segment/SingleScanTimeDimensionSelector.java b/processing/src/main/java/org/apache/druid/segment/SingleScanTimeDimensionSelector.java
index 8a1be5a..1cf3efb 100644
--- a/processing/src/main/java/org/apache/druid/segment/SingleScanTimeDimensionSelector.java
+++ b/processing/src/main/java/org/apache/druid/segment/SingleScanTimeDimensionSelector.java
@@ -46,10 +46,12 @@ public class SingleScanTimeDimensionSelector implements DimensionSelector
 
   private final List<String> timeValues = new ArrayList<>();
   private final SingleIndexedInt row = new SingleIndexedInt();
-  private String currentValue = null;
   private long currentTimestamp = Long.MIN_VALUE;
   private int index = -1;
 
+  @Nullable
+  private String currentValue = null;
+
   public SingleScanTimeDimensionSelector(
       BaseLongColumnValueSelector selector,
       @Nullable ExtractionFn extractionFn,
diff --git a/processing/src/main/java/org/apache/druid/segment/StringDimensionIndexer.java b/processing/src/main/java/org/apache/druid/segment/StringDimensionIndexer.java
index cf201e4..332da92 100644
--- a/processing/src/main/java/org/apache/druid/segment/StringDimensionIndexer.java
+++ b/processing/src/main/java/org/apache/druid/segment/StringDimensionIndexer.java
@@ -232,9 +232,11 @@ public class StringDimensionIndexer implements DimensionIndexer<Integer, int[],
   private final DimensionDictionary dimLookup;
   private final MultiValueHandling multiValueHandling;
   private final boolean hasBitmapIndexes;
-  private SortedDimensionDictionary sortedLookup;
   private boolean hasMultipleValues = false;
 
+  @Nullable
+  private SortedDimensionDictionary sortedLookup;
+
   public StringDimensionIndexer(MultiValueHandling multiValueHandling, boolean hasBitmapIndexes)
   {
     this.dimLookup = new DimensionDictionary();
@@ -252,7 +254,7 @@ public class StringDimensionIndexer implements DimensionIndexer<Integer, int[],
       final int nullId = dimLookup.getId(null);
       encodedDimensionValues = nullId == ABSENT_VALUE_ID ? new int[]{dimLookup.add(null)} : new int[]{nullId};
     } else if (dimValues instanceof List) {
-      List<Object> dimValuesList = (List) dimValues;
+      List<Object> dimValuesList = (List<Object>) dimValues;
       if (dimValuesList.isEmpty()) {
         dimLookup.add(null);
         encodedDimensionValues = IntArrays.EMPTY_ARRAY;
diff --git a/processing/src/main/java/org/apache/druid/segment/StringDimensionMergerV9.java b/processing/src/main/java/org/apache/druid/segment/StringDimensionMergerV9.java
index 050163b..6340502 100644
--- a/processing/src/main/java/org/apache/druid/segment/StringDimensionMergerV9.java
+++ b/processing/src/main/java/org/apache/druid/segment/StringDimensionMergerV9.java
@@ -77,27 +77,36 @@ public class StringDimensionMergerV9 implements DimensionMergerV9
   private static final Indexed<String> NULL_STR_DIM_VAL = new ListIndexed<>(Collections.singletonList(null));
   private static final Splitter SPLITTER = Splitter.on(",");
 
-  private ColumnarIntsSerializer encodedValueSerializer;
+  private final String dimensionName;
+  private final ProgressIndicator progress;
+  private final Closer closer;
+  private final IndexSpec indexSpec;
+  private final SegmentWriteOutMedium segmentWriteOutMedium;
+  private final MutableBitmap nullRowsBitmap;
+  private final ColumnCapabilities capabilities;
 
-  private String dimensionName;
-  private GenericIndexedWriter<String> dictionaryWriter;
-  private String firstDictionaryValue;
   private int dictionarySize;
+  private int rowCount = 0;
+  private int cardinality = 0;
+  private boolean hasNull = false;
+
+  @Nullable
   private GenericIndexedWriter<ImmutableBitmap> bitmapWriter;
+  @Nullable
   private ByteBufferWriter<ImmutableRTree> spatialWriter;
+  @Nullable
   private ArrayList<IntBuffer> dimConversions;
-  private int cardinality = 0;
-  private boolean hasNull = false;
-  private MutableBitmap nullRowsBitmap;
-  private final SegmentWriteOutMedium segmentWriteOutMedium;
-  private int rowCount = 0;
-  private ColumnCapabilities capabilities;
+  @Nullable
   private List<IndexableAdapter> adapters;
-  private final IndexSpec indexSpec;
+  @Nullable
   private IndexMerger.DictionaryMergeIterator dictionaryMergeIterator;
+  @Nullable
+  private ColumnarIntsSerializer encodedValueSerializer;
+  @Nullable
+  private GenericIndexedWriter<String> dictionaryWriter;
+  @Nullable
+  private String firstDictionaryValue;
 
-  private final ProgressIndicator progress;
-  private final Closer closer;
 
   public StringDimensionMergerV9(
       String dimensionName,
@@ -537,13 +546,12 @@ public class StringDimensionMergerV9 implements DimensionMergerV9
         .withBitmapIndex(bitmapWriter)
         .withSpatialIndex(spatialWriter)
         .withByteOrder(IndexIO.BYTE_ORDER);
-    final ColumnDescriptor serdeficator = builder
-        .addSerde(partBuilder.build())
-        .build();
 
     //log.info("Completed dimension column[%s] in %,d millis.", dimensionName, System.currentTimeMillis() - dimStartTime);
 
-    return serdeficator;
+    return builder
+        .addSerde(partBuilder.build())
+        .build();
   }
 
   protected interface IndexSeeker
diff --git a/processing/src/main/java/org/apache/druid/segment/column/ColumnBuilder.java b/processing/src/main/java/org/apache/druid/segment/column/ColumnBuilder.java
index ce081df..86b9b86 100644
--- a/processing/src/main/java/org/apache/druid/segment/column/ColumnBuilder.java
+++ b/processing/src/main/java/org/apache/druid/segment/column/ColumnBuilder.java
@@ -23,18 +23,25 @@ import com.google.common.base.Preconditions;
 import com.google.common.base.Supplier;
 import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper;
 
+import javax.annotation.Nullable;
+
 /**
  */
 public class ColumnBuilder
 {
+  @Nullable
   private ValueType type = null;
   private boolean hasMultipleValues = false;
   private boolean filterable = false;
+  private boolean dictionaryEncoded = false;
 
+  @Nullable
   private Supplier<? extends BaseColumn> columnSupplier = null;
-  private boolean dictionaryEncoded = false;
+  @Nullable
   private Supplier<BitmapIndex> bitmapIndex = null;
+  @Nullable
   private Supplier<SpatialIndex> spatialIndex = null;
+  @Nullable
   private SmooshedFileMapper fileMapper = null;
 
   public ColumnBuilder setFileMapper(SmooshedFileMapper fileMapper)
diff --git a/processing/src/main/java/org/apache/druid/segment/column/ColumnCapabilitiesImpl.java b/processing/src/main/java/org/apache/druid/segment/column/ColumnCapabilitiesImpl.java
index 620a0f6..94cdbe3 100644
--- a/processing/src/main/java/org/apache/druid/segment/column/ColumnCapabilitiesImpl.java
+++ b/processing/src/main/java/org/apache/druid/segment/column/ColumnCapabilitiesImpl.java
@@ -23,12 +23,16 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.druid.java.util.common.ISE;
 
+import javax.annotation.Nullable;
+
 /**
  *
  */
 public class ColumnCapabilitiesImpl implements ColumnCapabilities
 {
+  @Nullable
   private ValueType type = null;
+
   private boolean dictionaryEncoded = false;
   private boolean runLengthEncoded = false;
   private boolean hasInvertedIndexes = false;
diff --git a/processing/src/main/java/org/apache/druid/segment/column/ColumnDescriptor.java b/processing/src/main/java/org/apache/druid/segment/column/ColumnDescriptor.java
index 1307640..fbb5b1d 100644
--- a/processing/src/main/java/org/apache/druid/segment/column/ColumnDescriptor.java
+++ b/processing/src/main/java/org/apache/druid/segment/column/ColumnDescriptor.java
@@ -28,6 +28,8 @@ import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper;
 import org.apache.druid.segment.serde.ColumnPartSerde;
 import org.apache.druid.segment.serde.Serializer;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.WritableByteChannel;
@@ -111,7 +113,9 @@ public class ColumnDescriptor implements Serializer
 
   public static class Builder
   {
+    @Nullable
     private ValueType valueType = null;
+    @Nullable
     private Boolean hasMultipleValues = null;
 
     private final List<ColumnPartSerde> parts = new ArrayList<>();
diff --git a/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarDoublesSerializer.java b/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarDoublesSerializer.java
index 8c2db09..3558473 100644
--- a/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarDoublesSerializer.java
+++ b/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarDoublesSerializer.java
@@ -25,6 +25,8 @@ import org.apache.druid.segment.CompressedPools;
 import org.apache.druid.segment.serde.MetaSerdeHelper;
 import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
@@ -45,6 +47,7 @@ public class BlockLayoutColumnarDoublesSerializer implements ColumnarDoublesSeri
   private final CompressionStrategy compression;
 
   private int numInserted = 0;
+  @Nullable
   private ByteBuffer endBuffer;
 
   BlockLayoutColumnarDoublesSerializer(
diff --git a/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarFloatsSerializer.java b/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarFloatsSerializer.java
index 84d2e26..aa225b8 100644
--- a/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarFloatsSerializer.java
+++ b/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarFloatsSerializer.java
@@ -25,6 +25,8 @@ import org.apache.druid.segment.CompressedPools;
 import org.apache.druid.segment.serde.MetaSerdeHelper;
 import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
@@ -45,6 +47,7 @@ public class BlockLayoutColumnarFloatsSerializer implements ColumnarFloatsSerial
   private final CompressionStrategy compression;
 
   private int numInserted = 0;
+  @Nullable
   private ByteBuffer endBuffer;
 
   BlockLayoutColumnarFloatsSerializer(
diff --git a/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarLongsSerializer.java b/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarLongsSerializer.java
index c1851c7..778a061 100644
--- a/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarLongsSerializer.java
+++ b/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarLongsSerializer.java
@@ -24,6 +24,8 @@ import org.apache.druid.segment.CompressedPools;
 import org.apache.druid.segment.serde.MetaSerdeHelper;
 import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
@@ -47,6 +49,7 @@ public class BlockLayoutColumnarLongsSerializer implements ColumnarLongsSerializ
   private int numInserted = 0;
   private int numInsertedForNextFlush;
 
+  @Nullable
   private ByteBuffer endBuffer;
 
   BlockLayoutColumnarLongsSerializer(
diff --git a/processing/src/main/java/org/apache/druid/segment/data/ByteBufferWriter.java b/processing/src/main/java/org/apache/druid/segment/data/ByteBufferWriter.java
index 7152944..ee1e781 100644
--- a/processing/src/main/java/org/apache/druid/segment/data/ByteBufferWriter.java
+++ b/processing/src/main/java/org/apache/druid/segment/data/ByteBufferWriter.java
@@ -26,6 +26,8 @@ import org.apache.druid.segment.serde.Serializer;
 import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
 import org.apache.druid.segment.writeout.WriteOutBytes;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.nio.channels.WritableByteChannel;
 
@@ -36,7 +38,9 @@ public class ByteBufferWriter<T> implements Serializer
   private final SegmentWriteOutMedium segmentWriteOutMedium;
   private final ObjectStrategy<T> strategy;
 
+  @Nullable
   private WriteOutBytes headerOut = null;
+  @Nullable
   private WriteOutBytes valueOut = null;
 
   public ByteBufferWriter(SegmentWriteOutMedium segmentWriteOutMedium, ObjectStrategy<T> strategy)
diff --git a/processing/src/main/java/org/apache/druid/segment/data/CompressedColumnarIntsSerializer.java b/processing/src/main/java/org/apache/druid/segment/data/CompressedColumnarIntsSerializer.java
index dacf62f..a5db9e2 100644
--- a/processing/src/main/java/org/apache/druid/segment/data/CompressedColumnarIntsSerializer.java
+++ b/processing/src/main/java/org/apache/druid/segment/data/CompressedColumnarIntsSerializer.java
@@ -24,6 +24,8 @@ import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
 import org.apache.druid.segment.serde.MetaSerdeHelper;
 import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
@@ -45,9 +47,11 @@ public class CompressedColumnarIntsSerializer extends SingleValueColumnarIntsSer
   private final int chunkFactor;
   private final CompressionStrategy compression;
   private final GenericIndexedWriter<ByteBuffer> flattener;
-  private ByteBuffer endBuffer;
   private int numInserted;
 
+  @Nullable
+  private ByteBuffer endBuffer;
+
   CompressedColumnarIntsSerializer(
       final SegmentWriteOutMedium segmentWriteOutMedium,
       final String filenameBase,
diff --git a/processing/src/main/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSerializer.java b/processing/src/main/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSerializer.java
index ffc1ee8..72f9fc9 100644
--- a/processing/src/main/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSerializer.java
+++ b/processing/src/main/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSerializer.java
@@ -25,6 +25,8 @@ import org.apache.druid.segment.IndexIO;
 import org.apache.druid.segment.serde.MetaSerdeHelper;
 import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
@@ -67,9 +69,10 @@ public class CompressedVSizeColumnarIntsSerializer extends SingleValueColumnarIn
   private final CompressionStrategy compression;
   private final GenericIndexedWriter<ByteBuffer> flattener;
   private final ByteBuffer intBuffer;
+  private int numInserted;
 
+  @Nullable
   private ByteBuffer endBuffer;
-  private int numInserted;
 
   CompressedVSizeColumnarIntsSerializer(
       final SegmentWriteOutMedium segmentWriteOutMedium,
diff --git a/processing/src/main/java/org/apache/druid/segment/data/GenericIndexed.java b/processing/src/main/java/org/apache/druid/segment/data/GenericIndexed.java
index e44a200..8a2bc57 100644
--- a/processing/src/main/java/org/apache/druid/segment/data/GenericIndexed.java
+++ b/processing/src/main/java/org/apache/druid/segment/data/GenericIndexed.java
@@ -198,6 +198,7 @@ public class GenericIndexed<T> implements CloseableIndexed<T>, Serializer
   private int logBaseTwoOfElementsPerValueFile;
   private int relativeIndexMask;
 
+  @Nullable
   private final ByteBuffer theBuffer;
 
   /**
diff --git a/processing/src/main/java/org/apache/druid/segment/data/GenericIndexedWriter.java b/processing/src/main/java/org/apache/druid/segment/data/GenericIndexedWriter.java
index 77bcd91..120b36b 100644
--- a/processing/src/main/java/org/apache/druid/segment/data/GenericIndexedWriter.java
+++ b/processing/src/main/java/org/apache/druid/segment/data/GenericIndexedWriter.java
@@ -140,10 +140,13 @@ public class GenericIndexedWriter<T> implements Serializer
   private boolean objectsSorted = true;
   @Nullable
   private T prevObject = null;
+  @Nullable
   private WriteOutBytes headerOut = null;
+  @Nullable
   private WriteOutBytes valuesOut = null;
   private int numWritten = 0;
   private boolean requireMultipleFiles = false;
+  @Nullable
   private LongList headerOutLong;
 
   private final ByteBuffer getOffsetBuffer = ByteBuffer.allocate(Integer.BYTES);
diff --git a/processing/src/main/java/org/apache/druid/segment/data/IntermediateColumnarLongsSerializer.java b/processing/src/main/java/org/apache/druid/segment/data/IntermediateColumnarLongsSerializer.java
index e57e81d..e08e463 100644
--- a/processing/src/main/java/org/apache/druid/segment/data/IntermediateColumnarLongsSerializer.java
+++ b/processing/src/main/java/org/apache/druid/segment/data/IntermediateColumnarLongsSerializer.java
@@ -27,6 +27,8 @@ import it.unimi.dsi.fastutil.longs.LongList;
 import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
 import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.nio.ByteOrder;
 import java.nio.channels.WritableByteChannel;
@@ -42,7 +44,6 @@ public class IntermediateColumnarLongsSerializer implements ColumnarLongsSeriali
   private final String filenameBase;
   private final ByteOrder order;
   private final CompressionStrategy compression;
-  private LongList tempOut = null;
 
   private int numInserted = 0;
 
@@ -52,6 +53,9 @@ public class IntermediateColumnarLongsSerializer implements ColumnarLongsSeriali
   private long maxVal = Long.MIN_VALUE;
   private long minVal = Long.MAX_VALUE;
 
+  @Nullable
+  private LongList tempOut = null;
+  @Nullable
   private ColumnarLongsSerializer delegate;
 
   IntermediateColumnarLongsSerializer(
diff --git a/processing/src/main/java/org/apache/druid/segment/data/LongsLongEncodingWriter.java b/processing/src/main/java/org/apache/druid/segment/data/LongsLongEncodingWriter.java
index b421105..2aeb194 100644
--- a/processing/src/main/java/org/apache/druid/segment/data/LongsLongEncodingWriter.java
+++ b/processing/src/main/java/org/apache/druid/segment/data/LongsLongEncodingWriter.java
@@ -21,6 +21,8 @@ package org.apache.druid.segment.data;
 
 import org.apache.druid.segment.writeout.WriteOutBytes;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
@@ -31,7 +33,9 @@ public class LongsLongEncodingWriter implements CompressionFactory.LongEncodingW
 
   private final ByteBuffer orderBuffer;
   private final ByteOrder order;
+  @Nullable
   private ByteBuffer outBuffer = null;
+  @Nullable
   private OutputStream outStream = null;
 
   public LongsLongEncodingWriter(ByteOrder order)
diff --git a/processing/src/main/java/org/apache/druid/segment/data/VSizeColumnarIntsSerializer.java b/processing/src/main/java/org/apache/druid/segment/data/VSizeColumnarIntsSerializer.java
index 90d4c20..ff646fc 100644
--- a/processing/src/main/java/org/apache/druid/segment/data/VSizeColumnarIntsSerializer.java
+++ b/processing/src/main/java/org/apache/druid/segment/data/VSizeColumnarIntsSerializer.java
@@ -26,6 +26,8 @@ import org.apache.druid.segment.serde.MetaSerdeHelper;
 import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
 import org.apache.druid.segment.writeout.WriteOutBytes;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.WritableByteChannel;
@@ -46,9 +48,11 @@ public class VSizeColumnarIntsSerializer extends SingleValueColumnarIntsSerializ
   private final int numBytes;
 
   private final ByteBuffer helperBuffer = ByteBuffer.allocate(Integer.BYTES);
-  private WriteOutBytes valuesOut = null;
   private boolean bufPaddingWritten = false;
 
+  @Nullable
+  private WriteOutBytes valuesOut = null;
+
   public VSizeColumnarIntsSerializer(final SegmentWriteOutMedium segmentWriteOutMedium, final int maxValue)
   {
     this.segmentWriteOutMedium = segmentWriteOutMedium;
diff --git a/processing/src/main/java/org/apache/druid/segment/data/VSizeColumnarMultiIntsSerializer.java b/processing/src/main/java/org/apache/druid/segment/data/VSizeColumnarMultiIntsSerializer.java
index d183d357..6a85a46 100644
--- a/processing/src/main/java/org/apache/druid/segment/data/VSizeColumnarMultiIntsSerializer.java
+++ b/processing/src/main/java/org/apache/druid/segment/data/VSizeColumnarMultiIntsSerializer.java
@@ -26,6 +26,8 @@ import org.apache.druid.segment.serde.MetaSerdeHelper;
 import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
 import org.apache.druid.segment.writeout.WriteOutBytes;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.nio.channels.WritableByteChannel;
 
@@ -83,7 +85,9 @@ public class VSizeColumnarMultiIntsSerializer extends ColumnarMultiIntsSerialize
   private final WriteInt writeInt;
 
   private final SegmentWriteOutMedium segmentWriteOutMedium;
+  @Nullable
   private WriteOutBytes headerOut = null;
+  @Nullable
   private WriteOutBytes valuesOut = null;
   private int numWritten = 0;
   private boolean numBytesForMaxWritten = false;
diff --git a/processing/src/main/java/org/apache/druid/segment/data/VSizeLongSerde.java b/processing/src/main/java/org/apache/druid/segment/data/VSizeLongSerde.java
index 3bfa68a..6741be5 100644
--- a/processing/src/main/java/org/apache/druid/segment/data/VSizeLongSerde.java
+++ b/processing/src/main/java/org/apache/druid/segment/data/VSizeLongSerde.java
@@ -21,6 +21,8 @@ package org.apache.druid.segment.data;
 
 import org.apache.druid.java.util.common.IAE;
 
+import javax.annotation.Nullable;
+
 import java.io.Closeable;
 import java.io.IOException;
 import java.io.OutputStream;
@@ -189,6 +191,7 @@ public class VSizeLongSerde
 
   private static final class Size1Ser implements LongSerializer
   {
+    @Nullable
     OutputStream output = null;
     ByteBuffer buffer;
     byte curByte = 0;
@@ -242,6 +245,7 @@ public class VSizeLongSerde
 
   private static final class Size2Ser implements LongSerializer
   {
+    @Nullable
     OutputStream output = null;
     ByteBuffer buffer;
     byte curByte = 0;
@@ -295,8 +299,8 @@ public class VSizeLongSerde
 
   private static final class Mult4Ser implements LongSerializer
   {
-
-    OutputStream output = null;
+    @Nullable
+    OutputStream output;
     ByteBuffer buffer;
     int numBytes;
     byte curByte = 0;
@@ -361,6 +365,7 @@ public class VSizeLongSerde
 
   private static final class Mult8Ser implements LongSerializer
   {
+    @Nullable
     OutputStream output;
     ByteBuffer buffer;
     int numBytes;
diff --git a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java
index 515c475..8e21296 100644
--- a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java
+++ b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java
@@ -330,6 +330,7 @@ public abstract class IncrementalIndex<AggregatorType> extends AbstractIndex imp
 
   public static class Builder
   {
+    @Nullable
     private IncrementalIndexSchema incrementalIndexSchema;
     private boolean deserializeComplexMetrics;
     private boolean reportParseExceptions;
@@ -505,8 +506,8 @@ public abstract class IncrementalIndex<AggregatorType> extends AbstractIndex imp
 
   static class IncrementalIndexRowResult
   {
-    private IncrementalIndexRow incrementalIndexRow;
-    private List<String> parseExceptionMessages;
+    private final IncrementalIndexRow incrementalIndexRow;
+    private final List<String> parseExceptionMessages;
 
     IncrementalIndexRowResult(IncrementalIndexRow incrementalIndexRow, List<String> parseExceptionMessages)
     {
@@ -527,9 +528,9 @@ public abstract class IncrementalIndex<AggregatorType> extends AbstractIndex imp
 
   static class AddToFactsResult
   {
-    private int rowCount;
+    private final int rowCount;
     private final long bytesInMemory;
-    private List<String> parseExceptionMessages;
+    private final List<String> parseExceptionMessages;
 
     public AddToFactsResult(
         int rowCount,
@@ -997,53 +998,48 @@ public abstract class IncrementalIndex<AggregatorType> extends AbstractIndex imp
 
   public Iterable<Row> iterableWithPostAggregations(final List<PostAggregator> postAggs, final boolean descending)
   {
-    return new Iterable<Row>()
-    {
-      @Override
-      public Iterator<Row> iterator()
-      {
-        final List<DimensionDesc> dimensions = getDimensions();
-
-        return Iterators.transform(
-            getFacts().iterator(descending),
-            incrementalIndexRow -> {
-              final int rowOffset = incrementalIndexRow.getRowIndex();
-
-              Object[] theDims = incrementalIndexRow.getDims();
-
-              Map<String, Object> theVals = Maps.newLinkedHashMap();
-              for (int i = 0; i < theDims.length; ++i) {
-                Object dim = theDims[i];
-                DimensionDesc dimensionDesc = dimensions.get(i);
-                if (dimensionDesc == null) {
-                  continue;
-                }
-                String dimensionName = dimensionDesc.getName();
-                DimensionHandler handler = dimensionDesc.getHandler();
-                if (dim == null || handler.getLengthOfEncodedKeyComponent(dim) == 0) {
-                  theVals.put(dimensionName, null);
-                  continue;
-                }
-                final DimensionIndexer indexer = dimensionDesc.getIndexer();
-                Object rowVals = indexer.convertUnsortedEncodedKeyComponentToActualList(dim);
-                theVals.put(dimensionName, rowVals);
+    return () -> {
+      final List<DimensionDesc> dimensions = getDimensions();
+
+      return Iterators.transform(
+          getFacts().iterator(descending),
+          incrementalIndexRow -> {
+            final int rowOffset = incrementalIndexRow.getRowIndex();
+
+            Object[] theDims = incrementalIndexRow.getDims();
+
+            Map<String, Object> theVals = Maps.newLinkedHashMap();
+            for (int i = 0; i < theDims.length; ++i) {
+              Object dim = theDims[i];
+              DimensionDesc dimensionDesc = dimensions.get(i);
+              if (dimensionDesc == null) {
+                continue;
               }
-
-              AggregatorType[] aggs = getAggsForRow(rowOffset);
-              for (int i = 0; i < aggs.length; ++i) {
-                theVals.put(metrics[i].getName(), getAggVal(aggs[i], rowOffset, i));
+              String dimensionName = dimensionDesc.getName();
+              DimensionHandler handler = dimensionDesc.getHandler();
+              if (dim == null || handler.getLengthOfEncodedKeyComponent(dim) == 0) {
+                theVals.put(dimensionName, null);
+                continue;
               }
+              final DimensionIndexer indexer = dimensionDesc.getIndexer();
+              Object rowVals = indexer.convertUnsortedEncodedKeyComponentToActualList(dim);
+              theVals.put(dimensionName, rowVals);
+            }
 
-              if (postAggs != null) {
-                for (PostAggregator postAgg : postAggs) {
-                  theVals.put(postAgg.getName(), postAgg.compute(theVals));
-                }
-              }
+            AggregatorType[] aggs = getAggsForRow(rowOffset);
+            for (int i = 0; i < aggs.length; ++i) {
+              theVals.put(metrics[i].getName(), getAggVal(aggs[i], rowOffset, i));
+            }
 
-              return new MapBasedRow(incrementalIndexRow.getTimestamp(), theVals);
+            if (postAggs != null) {
+              for (PostAggregator postAgg : postAggs) {
+                theVals.put(postAgg.getName(), postAgg.compute(theVals));
+              }
             }
-        );
-      }
+
+            return new MapBasedRow(incrementalIndexRow.getTimestamp(), theVals);
+          }
+      );
     };
   }
 
diff --git a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexAdapter.java b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexAdapter.java
index 2493bdd..b896c9e 100644
--- a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexAdapter.java
+++ b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexAdapter.java
@@ -48,6 +48,7 @@ public class IncrementalIndexAdapter implements IndexableAdapter
   private static class DimensionAccessor
   {
     private final IncrementalIndex.DimensionDesc dimensionDesc;
+    @Nullable
     private final MutableBitmap[] invertedIndexes;
     private final DimensionIndexer indexer;
 
diff --git a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexRowHolder.java b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexRowHolder.java
index b9180dd..c46ef55 100644
--- a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexRowHolder.java
+++ b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexRowHolder.java
@@ -22,6 +22,8 @@ package org.apache.druid.segment.incremental;
 import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
 import org.apache.druid.segment.LongColumnSelector;
 
+import javax.annotation.Nullable;
+
 /**
  * IncrementalIndexRowHolder is a simple {@link #get}/{@link #set} holder of {@link IncrementalIndexRow}. It is used
  * to implement various machinery around {@link IncrementalIndex}, e. g. {@link
@@ -33,6 +35,7 @@ import org.apache.druid.segment.LongColumnSelector;
  */
 public class IncrementalIndexRowHolder implements LongColumnSelector
 {
+  @Nullable
   private IncrementalIndexRow currEntry = null;
 
   public IncrementalIndexRow get()
diff --git a/processing/src/main/java/org/apache/druid/segment/incremental/OffheapIncrementalIndex.java b/processing/src/main/java/org/apache/druid/segment/incremental/OffheapIncrementalIndex.java
index 95c88fc..ac48901 100644
--- a/processing/src/main/java/org/apache/druid/segment/incremental/OffheapIncrementalIndex.java
+++ b/processing/src/main/java/org/apache/druid/segment/incremental/OffheapIncrementalIndex.java
@@ -33,6 +33,8 @@ import org.apache.druid.query.aggregation.AggregatorFactory;
 import org.apache.druid.query.aggregation.BufferAggregator;
 import org.apache.druid.segment.ColumnSelectorFactory;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -59,14 +61,17 @@ public class OffheapIncrementalIndex extends IncrementalIndex<BufferAggregator>
 
   protected final int maxRowCount;
 
+  @Nullable
   private volatile Map<String, ColumnSelectorFactory> selectors;
 
   //given a ByteBuffer and an offset where all aggregates for a row are stored
   //offset + aggOffsetInBuffer[i] would give position in ByteBuffer where ith aggregate
   //is stored
+  @Nullable
   private volatile int[] aggOffsetInBuffer;
   private volatile int aggsTotalSize;
 
+  @Nullable
   private String outOfRowsReason = null;
 
   OffheapIncrementalIndex(
diff --git a/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java b/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java
index 80e21a0..8e5c55b 100644
--- a/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java
+++ b/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java
@@ -60,8 +60,10 @@ public class OnheapIncrementalIndex extends IncrementalIndex<Aggregator>
   private final long maxBytesPerRowForAggregators;
   protected final int maxRowCount;
   protected final long maxBytesInMemory;
-  private volatile Map<String, ColumnSelectorFactory> selectors;
 
+  @Nullable
+  private volatile Map<String, ColumnSelectorFactory> selectors;
+  @Nullable
   private String outOfRowsReason = null;
 
   OnheapIncrementalIndex(
diff --git a/processing/src/main/java/org/apache/druid/segment/serde/ComplexColumnPartSerde.java b/processing/src/main/java/org/apache/druid/segment/serde/ComplexColumnPartSerde.java
index c362351..f10f53b 100644
--- a/processing/src/main/java/org/apache/druid/segment/serde/ComplexColumnPartSerde.java
+++ b/processing/src/main/java/org/apache/druid/segment/serde/ComplexColumnPartSerde.java
@@ -22,16 +22,15 @@ package org.apache.druid.segment.serde;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.druid.segment.GenericColumnSerializer;
-import org.apache.druid.segment.column.ColumnBuilder;
-import org.apache.druid.segment.column.ColumnConfig;
 
-import java.nio.ByteBuffer;
+import javax.annotation.Nullable;
 
 /**
  */
 public class ComplexColumnPartSerde implements ColumnPartSerde
 {
   private final String typeName;
+  @Nullable
   private final ComplexMetricSerde serde;
   private final Serializer serializer;
 
@@ -70,21 +69,18 @@ public class ComplexColumnPartSerde implements ColumnPartSerde
   @Override
   public Deserializer getDeserializer()
   {
-    return new Deserializer()
-    {
-      @Override
-      public void read(ByteBuffer buffer, ColumnBuilder builder, ColumnConfig columnConfig)
-      {
-        if (serde != null) {
-          serde.deserializeColumn(buffer, builder);
-        }
+    return (buffer, builder, columnConfig) -> {
+      if (serde != null) {
+        serde.deserializeColumn(buffer, builder);
       }
     };
   }
 
   public static class SerializerBuilder
   {
+    @Nullable
     private String typeName = null;
+    @Nullable
     private GenericColumnSerializer delegate = null;
 
     public SerializerBuilder withTypeName(final String typeName)
diff --git a/processing/src/main/java/org/apache/druid/segment/serde/DictionaryEncodedColumnPartSerde.java b/processing/src/main/java/org/apache/druid/segment/serde/DictionaryEncodedColumnPartSerde.java
index 136eca6..7f0bc3d 100644
--- a/processing/src/main/java/org/apache/druid/segment/serde/DictionaryEncodedColumnPartSerde.java
+++ b/processing/src/main/java/org/apache/druid/segment/serde/DictionaryEncodedColumnPartSerde.java
@@ -143,13 +143,21 @@ public class DictionaryEncodedColumnPartSerde implements ColumnPartSerde
 
   public static class SerializerBuilder
   {
-    private VERSION version = null;
     private int flags = STARTING_FLAGS;
+
+    @Nullable
+    private VERSION version = null;
+    @Nullable
     private GenericIndexedWriter<String> dictionaryWriter = null;
+    @Nullable
     private ColumnarIntsSerializer valueWriter = null;
+    @Nullable
     private BitmapSerdeFactory bitmapSerdeFactory = null;
+    @Nullable
     private GenericIndexedWriter<ImmutableBitmap> bitmapIndexWriter = null;
+    @Nullable
     private ByteBufferWriter<ImmutableRTree> spatialIndexWriter = null;
+    @Nullable
     private ByteOrder byteOrder = null;
 
     public SerializerBuilder withDictionary(GenericIndexedWriter<String> dictionaryWriter)
diff --git a/processing/src/main/java/org/apache/druid/segment/serde/DoubleNumericColumnPartSerde.java b/processing/src/main/java/org/apache/druid/segment/serde/DoubleNumericColumnPartSerde.java
index 82ebfc2..5a08e91 100644
--- a/processing/src/main/java/org/apache/druid/segment/serde/DoubleNumericColumnPartSerde.java
+++ b/processing/src/main/java/org/apache/druid/segment/serde/DoubleNumericColumnPartSerde.java
@@ -23,14 +23,12 @@ import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Supplier;
 import org.apache.druid.segment.IndexIO;
-import org.apache.druid.segment.column.ColumnBuilder;
-import org.apache.druid.segment.column.ColumnConfig;
 import org.apache.druid.segment.column.ValueType;
 import org.apache.druid.segment.data.ColumnarDoubles;
 import org.apache.druid.segment.data.CompressedColumnarDoublesSuppliers;
 
 import javax.annotation.Nullable;
-import java.nio.ByteBuffer;
+
 import java.nio.ByteOrder;
 
 public class DoubleNumericColumnPartSerde implements ColumnPartSerde
@@ -66,7 +64,9 @@ public class DoubleNumericColumnPartSerde implements ColumnPartSerde
 
   public static class SerializerBuilder
   {
+    @Nullable
     private ByteOrder byteOrder = null;
+    @Nullable
     private Serializer delegate = null;
 
     public SerializerBuilder withByteOrder(final ByteOrder byteOrder)
@@ -97,24 +97,19 @@ public class DoubleNumericColumnPartSerde implements ColumnPartSerde
   @Override
   public Deserializer getDeserializer()
   {
-    return new Deserializer()
-    {
-      @Override
-      public void read(ByteBuffer buffer, ColumnBuilder builder, ColumnConfig columnConfig)
-      {
-        final Supplier<ColumnarDoubles> column = CompressedColumnarDoublesSuppliers.fromByteBuffer(
-            buffer,
-            byteOrder
-        );
-        DoubleNumericColumnSupplier columnSupplier = new DoubleNumericColumnSupplier(
-            column,
-            IndexIO.LEGACY_FACTORY.getBitmapFactory().makeEmptyImmutableBitmap()
-        );
-        builder.setType(ValueType.DOUBLE)
-               .setHasMultipleValues(false)
-               .setNumericColumnSupplier(columnSupplier);
-
-      }
+    return (buffer, builder, columnConfig) -> {
+      final Supplier<ColumnarDoubles> column = CompressedColumnarDoublesSuppliers.fromByteBuffer(
+          buffer,
+          byteOrder
+      );
+      DoubleNumericColumnSupplier columnSupplier = new DoubleNumericColumnSupplier(
+          column,
+          IndexIO.LEGACY_FACTORY.getBitmapFactory().makeEmptyImmutableBitmap()
+      );
+      builder.setType(ValueType.DOUBLE)
+             .setHasMultipleValues(false)
+             .setNumericColumnSupplier(columnSupplier);
+
     };
   }
 }
diff --git a/processing/src/main/java/org/apache/druid/segment/serde/DoubleNumericColumnPartSerdeV2.java b/processing/src/main/java/org/apache/druid/segment/serde/DoubleNumericColumnPartSerdeV2.java
index b822dbf..2262184 100644
--- a/processing/src/main/java/org/apache/druid/segment/serde/DoubleNumericColumnPartSerdeV2.java
+++ b/processing/src/main/java/org/apache/druid/segment/serde/DoubleNumericColumnPartSerdeV2.java
@@ -54,7 +54,7 @@ public class DoubleNumericColumnPartSerdeV2 implements ColumnPartSerde
 
   private final ByteOrder byteOrder;
   @Nullable
-  private Serializer serializer;
+  private final Serializer serializer;
   private final BitmapSerdeFactory bitmapSerdeFactory;
 
   public DoubleNumericColumnPartSerdeV2(
@@ -87,8 +87,11 @@ public class DoubleNumericColumnPartSerdeV2 implements ColumnPartSerde
 
   public static class SerializerBuilder
   {
+    @Nullable
     private ByteOrder byteOrder = null;
+    @Nullable
     private Serializer delegate = null;
+    @Nullable
     private BitmapSerdeFactory bitmapSerdeFactory = null;
 
     public SerializerBuilder withByteOrder(final ByteOrder byteOrder)
diff --git a/processing/src/main/java/org/apache/druid/segment/serde/FloatNumericColumnPartSerde.java b/processing/src/main/java/org/apache/druid/segment/serde/FloatNumericColumnPartSerde.java
index 46cc40b..02ac221 100644
--- a/processing/src/main/java/org/apache/druid/segment/serde/FloatNumericColumnPartSerde.java
+++ b/processing/src/main/java/org/apache/druid/segment/serde/FloatNumericColumnPartSerde.java
@@ -66,7 +66,9 @@ public class FloatNumericColumnPartSerde implements ColumnPartSerde
 
   public static class SerializerBuilder
   {
+    @Nullable
     private ByteOrder byteOrder = null;
+    @Nullable
     private Serializer delegate = null;
 
     public SerializerBuilder withByteOrder(final ByteOrder byteOrder)
diff --git a/processing/src/main/java/org/apache/druid/segment/serde/FloatNumericColumnPartSerdeV2.java b/processing/src/main/java/org/apache/druid/segment/serde/FloatNumericColumnPartSerdeV2.java
index ccc748a..b3c71db 100644
--- a/processing/src/main/java/org/apache/druid/segment/serde/FloatNumericColumnPartSerdeV2.java
+++ b/processing/src/main/java/org/apache/druid/segment/serde/FloatNumericColumnPartSerdeV2.java
@@ -52,7 +52,7 @@ public class FloatNumericColumnPartSerdeV2 implements ColumnPartSerde
 
   private final ByteOrder byteOrder;
   @Nullable
-  private Serializer serializer;
+  private final Serializer serializer;
   private final BitmapSerdeFactory bitmapSerdeFactory;
 
   private FloatNumericColumnPartSerdeV2(
@@ -85,8 +85,11 @@ public class FloatNumericColumnPartSerdeV2 implements ColumnPartSerde
 
   public static class SerializerBuilder
   {
+    @Nullable
     private ByteOrder byteOrder = null;
+    @Nullable
     private Serializer delegate = null;
+    @Nullable
     private BitmapSerdeFactory bitmapSerdeFactory = null;
 
     public SerializerBuilder withByteOrder(final ByteOrder byteOrder)
diff --git a/processing/src/main/java/org/apache/druid/segment/serde/LongNumericColumnPartSerde.java b/processing/src/main/java/org/apache/druid/segment/serde/LongNumericColumnPartSerde.java
index 2dda2eb..3884875 100644
--- a/processing/src/main/java/org/apache/druid/segment/serde/LongNumericColumnPartSerde.java
+++ b/processing/src/main/java/org/apache/druid/segment/serde/LongNumericColumnPartSerde.java
@@ -22,13 +22,11 @@ package org.apache.druid.segment.serde;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.druid.segment.IndexIO;
-import org.apache.druid.segment.column.ColumnBuilder;
-import org.apache.druid.segment.column.ColumnConfig;
 import org.apache.druid.segment.column.ValueType;
 import org.apache.druid.segment.data.CompressedColumnarLongsSupplier;
 
 import javax.annotation.Nullable;
-import java.nio.ByteBuffer;
+
 import java.nio.ByteOrder;
 
 /**
@@ -66,7 +64,9 @@ public class LongNumericColumnPartSerde implements ColumnPartSerde
 
   public static class SerializerBuilder
   {
+    @Nullable
     private ByteOrder byteOrder = null;
+    @Nullable
     private Serializer delegate = null;
 
     public SerializerBuilder withByteOrder(final ByteOrder byteOrder)
@@ -97,23 +97,18 @@ public class LongNumericColumnPartSerde implements ColumnPartSerde
   @Override
   public Deserializer getDeserializer()
   {
-    return new Deserializer()
-    {
-      @Override
-      public void read(ByteBuffer buffer, ColumnBuilder builder, ColumnConfig columnConfig)
-      {
-        final CompressedColumnarLongsSupplier column = CompressedColumnarLongsSupplier.fromByteBuffer(
-            buffer,
-            byteOrder
-        );
-        LongNumericColumnSupplier columnSupplier = new LongNumericColumnSupplier(
-            column,
-            IndexIO.LEGACY_FACTORY.getBitmapFactory().makeEmptyImmutableBitmap()
-        );
-        builder.setType(ValueType.LONG)
-               .setHasMultipleValues(false)
-               .setNumericColumnSupplier(columnSupplier);
-      }
+    return (buffer, builder, columnConfig) -> {
+      final CompressedColumnarLongsSupplier column = CompressedColumnarLongsSupplier.fromByteBuffer(
+          buffer,
+          byteOrder
+      );
+      LongNumericColumnSupplier columnSupplier = new LongNumericColumnSupplier(
+          column,
+          IndexIO.LEGACY_FACTORY.getBitmapFactory().makeEmptyImmutableBitmap()
+      );
+      builder.setType(ValueType.LONG)
+             .setHasMultipleValues(false)
+             .setNumericColumnSupplier(columnSupplier);
     };
   }
 }
diff --git a/processing/src/main/java/org/apache/druid/segment/serde/LongNumericColumnPartSerdeV2.java b/processing/src/main/java/org/apache/druid/segment/serde/LongNumericColumnPartSerdeV2.java
index ad27d81..7a46c51 100644
--- a/processing/src/main/java/org/apache/druid/segment/serde/LongNumericColumnPartSerdeV2.java
+++ b/processing/src/main/java/org/apache/druid/segment/serde/LongNumericColumnPartSerdeV2.java
@@ -50,14 +50,16 @@ public class LongNumericColumnPartSerdeV2 implements ColumnPartSerde
     );
   }
 
+  @Nullable
+  private final Serializer serializer;
+  @Nullable
   private final ByteOrder byteOrder;
   @Nullable
-  private Serializer serializer;
   private final BitmapSerdeFactory bitmapSerdeFactory;
 
   private LongNumericColumnPartSerdeV2(
-      ByteOrder byteOrder,
-      BitmapSerdeFactory bitmapSerdeFactory,
+      @Nullable ByteOrder byteOrder,
+      @Nullable BitmapSerdeFactory bitmapSerdeFactory,
       @Nullable Serializer serializer
   )
   {
@@ -85,8 +87,11 @@ public class LongNumericColumnPartSerdeV2 implements ColumnPartSerde
 
   public static class SerializerBuilder
   {
+    @Nullable
     private ByteOrder byteOrder = null;
+    @Nullable
     private Serializer delegate = null;
+    @Nullable
     private BitmapSerdeFactory bitmapSerdeFactory = null;
 
     public SerializerBuilder withByteOrder(final ByteOrder byteOrder)
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/firehose/EventReceiverFirehoseFactory.java b/server/src/main/java/org/apache/druid/segment/realtime/firehose/EventReceiverFirehoseFactory.java
index d8c1ef9..6842f8a 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/firehose/EventReceiverFirehoseFactory.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/firehose/EventReceiverFirehoseFactory.java
@@ -104,7 +104,7 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory<InputRowPar
    * {@link EventReceiverFirehose} that may change in the future.
    */
   private final long maxIdleTimeMillis;
-  private final @Nullable ChatHandlerProvider chatHandlerProvider;
+  private final ChatHandlerProvider chatHandlerProvider;
   private final ObjectMapper jsonMapper;
   private final ObjectMapper smileMapper;
   private final EventReceiverFirehoseRegister eventReceiverFirehoseRegister;
@@ -225,6 +225,7 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory<InputRowPar
      * This field and {@link #rowsRunOut} are not volatile because they are accessed only from {@link #hasMore()} and
      * {@link #nextRow()} methods that are called from a single thread according to {@link Firehose} spec.
      */
+    @Nullable
     private InputRow nextRow = null;
     private boolean rowsRunOut = false;
 
@@ -241,7 +242,9 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory<InputRowPar
      * If they were not volatile, NPE would be possible in {@link #delayedCloseExecutor}. See
      * https://shipilev.net/blog/2016/close-encounters-of-jmm-kind/#wishful-hb-actual for explanations.
      */
+    @Nullable
     private volatile Long idleCloseTimeNs = null;
+    @Nullable
     private volatile Long requestedShutdownTimeNs = null;
 
     EventReceiverFirehose(InputRowParser<Map<String, Object>> parser)
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/firehose/PredicateFirehose.java b/server/src/main/java/org/apache/druid/segment/realtime/firehose/PredicateFirehose.java
index b8c6b85..446efa3 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/firehose/PredicateFirehose.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/firehose/PredicateFirehose.java
@@ -40,6 +40,7 @@ public class PredicateFirehose implements Firehose
   private final Firehose firehose;
   private final Predicate<InputRow> predicate;
 
+  @Nullable
   private InputRow savedInputRow = null;
 
   public PredicateFirehose(Firehose firehose, Predicate<InputRow> predicate)
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/firehose/SqlFirehoseFactory.java b/server/src/main/java/org/apache/druid/segment/realtime/firehose/SqlFirehoseFactory.java
index 43738f8..866be52 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/firehose/SqlFirehoseFactory.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/firehose/SqlFirehoseFactory.java
@@ -35,6 +35,8 @@ import org.skife.jdbi.v2.exceptions.CallbackFailedException;
 import org.skife.jdbi.v2.exceptions.ResultSetException;
 import org.skife.jdbi.v2.exceptions.StatementException;
 
+import javax.annotation.Nullable;
+
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
@@ -51,6 +53,7 @@ public class SqlFirehoseFactory extends PrefetchSqlFirehoseFactory<String>
 {
   @JsonProperty
   private final List<String> sqls;
+  @Nullable
   @JsonProperty
   private final MetadataStorageConnectorConfig connectorConfig;
   private final ObjectMapper objectMapper;
diff --git a/server/src/main/java/org/apache/druid/server/coordination/BatchDataSegmentAnnouncer.java b/server/src/main/java/org/apache/druid/server/coordination/BatchDataSegmentAnnouncer.java
index fd53a1c..5c521bb 100644
--- a/server/src/main/java/org/apache/druid/server/coordination/BatchDataSegmentAnnouncer.java
+++ b/server/src/main/java/org/apache/druid/server/coordination/BatchDataSegmentAnnouncer.java
@@ -69,7 +69,8 @@ public class BatchDataSegmentAnnouncer implements DataSegmentAnnouncer
   private final ConcurrentMap<DataSegment, SegmentZNode> segmentLookup = new ConcurrentHashMap<>();
   private final Function<DataSegment, DataSegment> segmentTransformer;
 
-  private final ChangeRequestHistory<DataSegmentChangeRequest> changes = new ChangeRequestHistory();
+  private final ChangeRequestHistory<DataSegmentChangeRequest> changes = new ChangeRequestHistory<>();
+  @Nullable
   private final SegmentZNode dummyZnode;
 
   @Inject
@@ -87,20 +88,15 @@ public class BatchDataSegmentAnnouncer implements DataSegmentAnnouncer
     this.server = server;
 
     this.liveSegmentLocation = ZKPaths.makePath(zkPaths.getLiveSegmentsPath(), server.getName());
-    segmentTransformer = new Function<DataSegment, DataSegment>()
-    {
-      @Override
-      public DataSegment apply(DataSegment input)
-      {
-        DataSegment rv = input;
-        if (config.isSkipDimensionsAndMetrics()) {
-          rv = rv.withDimensions(null).withMetrics(null);
-        }
-        if (config.isSkipLoadSpec()) {
-          rv = rv.withLoadSpec(null);
-        }
-        return rv;
+    segmentTransformer = input -> {
+      DataSegment rv = input;
+      if (config.isSkipDimensionsAndMetrics()) {
+        rv = rv.withDimensions(null).withMetrics(null);
+      }
+      if (config.isSkipLoadSpec()) {
+        rv = rv.withLoadSpec(null);
       }
+      return rv;
     };
 
     if (this.config.isSkipSegmentAnnouncementOnZk()) {
diff --git a/server/src/main/java/org/apache/druid/server/coordination/BatchDataSegmentAnnouncerProvider.java b/server/src/main/java/org/apache/druid/server/coordination/BatchDataSegmentAnnouncerProvider.java
index b4acbf7..b521594 100644
--- a/server/src/main/java/org/apache/druid/server/coordination/BatchDataSegmentAnnouncerProvider.java
+++ b/server/src/main/java/org/apache/druid/server/coordination/BatchDataSegmentAnnouncerProvider.java
@@ -21,15 +21,15 @@ package org.apache.druid.server.coordination;
 
 import com.fasterxml.jackson.annotation.JacksonInject;
 
-import javax.validation.constraints.NotNull;
+import javax.annotation.Nullable;
 
 /**
  */
 public class BatchDataSegmentAnnouncerProvider implements DataSegmentAnnouncerProvider
 {
   @JacksonInject
-  @NotNull
-  private BatchDataSegmentAnnouncer batchAnnouncer = null;
+  @Nullable
+  private final BatchDataSegmentAnnouncer batchAnnouncer = null;
 
   @Override
   public DataSegmentAnnouncer get()
diff --git a/server/src/main/java/org/apache/druid/server/coordination/ChangeRequestHttpSyncer.java b/server/src/main/java/org/apache/druid/server/coordination/ChangeRequestHttpSyncer.java
index 8f38f9f..fc466d1 100644
--- a/server/src/main/java/org/apache/druid/server/coordination/ChangeRequestHttpSyncer.java
+++ b/server/src/main/java/org/apache/druid/server/coordination/ChangeRequestHttpSyncer.java
@@ -39,6 +39,7 @@ import org.jboss.netty.handler.codec.http.HttpHeaders;
 import org.jboss.netty.handler.codec.http.HttpMethod;
 import org.joda.time.Duration;
 
+import javax.annotation.Nullable;
 import javax.servlet.http.HttpServletResponse;
 import java.io.InputStream;
 import java.net.URL;
@@ -84,12 +85,14 @@ public class ChangeRequestHttpSyncer<T>
   private final LifecycleLock startStopLock = new LifecycleLock();
 
   private final String logIdentity;
-  private ChangeRequestHistory.Counter counter = null;
   private long unstableStartTime = -1;
   private int consecutiveFailedAttemptCount = 0;
   private long lastSuccessfulSyncTime = 0;
   private long lastSyncTime = 0;
 
+  @Nullable
+  private ChangeRequestHistory.Counter counter = null;
+
   public ChangeRequestHttpSyncer(
       ObjectMapper smileMapper,
       HttpClient httpClient,
diff --git a/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java b/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java
index c2a58ba..cb08182 100644
--- a/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java
+++ b/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java
@@ -44,6 +44,8 @@ import org.apache.druid.segment.loading.SegmentLoadingException;
 import org.apache.druid.server.SegmentManager;
 import org.apache.druid.timeline.DataSegment;
 
+import javax.annotation.Nullable;
+
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
@@ -240,14 +242,7 @@ public class SegmentLoadDropHandler implements DataSegmentChangeHandler
 
     addSegments(
         cachedSegments,
-        new DataSegmentChangeCallback()
-        {
-          @Override
-          public void execute()
-          {
-            log.info("Cache load took %,d ms", System.currentTimeMillis() - start);
-          }
-        }
+        () -> log.info("Cache load took %,d ms", System.currentTimeMillis() - start)
     );
   }
 
@@ -348,35 +343,30 @@ public class SegmentLoadDropHandler implements DataSegmentChangeHandler
       final CopyOnWriteArrayList<DataSegment> failedSegments = new CopyOnWriteArrayList<>();
       for (final DataSegment segment : segments) {
         loadingExecutor.submit(
-            new Runnable()
-            {
-              @Override
-              public void run()
-              {
+            () -> {
+              try {
+                log.info(
+                    "Loading segment[%d/%d][%s]",
+                    counter.incrementAndGet(),
+                    numSegments,
+                    segment.getId()
+                );
+                loadSegment(segment, callback);
                 try {
-                  log.info(
-                      "Loading segment[%d/%d][%s]",
-                      counter.incrementAndGet(),
-                      numSegments,
-                      segment.getId()
-                  );
-                  loadSegment(segment, callback);
-                  try {
-                    backgroundSegmentAnnouncer.announceSegment(segment);
-                  }
-                  catch (InterruptedException e) {
-                    Thread.currentThread().interrupt();
-                    throw new SegmentLoadingException(e, "Loading Interrupted");
-                  }
+                  backgroundSegmentAnnouncer.announceSegment(segment);
                 }
-                catch (SegmentLoadingException e) {
-                  log.error(e, "[%s] failed to load", segment.getId());
-                  failedSegments.add(segment);
-                }
-                finally {
-                  latch.countDown();
+                catch (InterruptedException e) {
+                  Thread.currentThread().interrupt();
+                  throw new SegmentLoadingException(e, "Loading Interrupted");
                 }
               }
+              catch (SegmentLoadingException e) {
+                log.error(e, "[%s] failed to load", segment.getId());
+                failedSegments.add(segment);
+              }
+              finally {
+                latch.countDown();
+              }
             }
         );
       }
@@ -427,28 +417,23 @@ public class SegmentLoadDropHandler implements DataSegmentChangeHandler
       announcer.unannounceSegment(segment);
       segmentsToDelete.add(segment);
 
-      Runnable runnable = new Runnable()
-      {
-        @Override
-        public void run()
-        {
-          try {
-            synchronized (segmentDeleteLock) {
-              if (segmentsToDelete.remove(segment)) {
-                segmentManager.dropSegment(segment);
-
-                File segmentInfoCacheFile = new File(config.getInfoDir(), segment.getId().toString());
-                if (!segmentInfoCacheFile.delete()) {
-                  log.warn("Unable to delete segmentInfoCacheFile[%s]", segmentInfoCacheFile);
-                }
+      Runnable runnable = () -> {
+        try {
+          synchronized (segmentDeleteLock) {
+            if (segmentsToDelete.remove(segment)) {
+              segmentManager.dropSegment(segment);
+
+              File segmentInfoCacheFile = new File(config.getInfoDir(), segment.getId().toString());
+              if (!segmentInfoCacheFile.delete()) {
+                log.warn("Unable to delete segmentInfoCacheFile[%s]", segmentInfoCacheFile);
               }
             }
           }
-          catch (Exception e) {
-            log.makeAlert(e, "Failed to remove segment! Possible resource leak!")
-               .addData("segment", segment)
-               .emit();
-          }
+        }
+        catch (Exception e) {
+          log.makeAlert(e, "Failed to remove segment! Possible resource leak!")
+             .addData("segment", segment)
+             .emit();
         }
       };
 
@@ -543,7 +528,7 @@ public class SegmentLoadDropHandler implements DataSegmentChangeHandler
                 );
               }
             },
-            () -> resolveWaitingFutures()
+            this::resolveWaitingFutures
         );
       }
       return requestStatuses.getIfPresent(changeRequest);
@@ -588,7 +573,9 @@ public class SegmentLoadDropHandler implements DataSegmentChangeHandler
     private final Object lock = new Object();
 
     private volatile boolean finished = false;
+    @Nullable
     private volatile ScheduledFuture startedAnnouncing = null;
+    @Nullable
     private volatile ScheduledFuture nextAnnoucement = null;
 
     public BackgroundSegmentAnnouncer(
@@ -755,6 +742,7 @@ public class SegmentLoadDropHandler implements DataSegmentChangeHandler
     }
 
     private final STATE state;
+    @Nullable
     private final String failureCause;
 
     public static final Status SUCCESS = new Status(STATE.SUCCESS, null);
@@ -763,7 +751,7 @@ public class SegmentLoadDropHandler implements DataSegmentChangeHandler
     @JsonCreator
     Status(
         @JsonProperty("state") STATE state,
-        @JsonProperty("failureCause") String failureCause
+        @JsonProperty("failureCause") @Nullable String failureCause
     )
     {
       Preconditions.checkNotNull(state, "state must be non-null");
@@ -782,6 +770,7 @@ public class SegmentLoadDropHandler implements DataSegmentChangeHandler
       return state;
     }
 
+    @Nullable
     @JsonProperty
     public String getFailureCause()
     {
diff --git a/server/src/main/java/org/apache/druid/server/coordination/ZkCoordinator.java b/server/src/main/java/org/apache/druid/server/coordination/ZkCoordinator.java
index ca56b10..734de20 100644
--- a/server/src/main/java/org/apache/druid/server/coordination/ZkCoordinator.java
+++ b/server/src/main/java/org/apache/druid/server/coordination/ZkCoordinator.java
@@ -25,8 +25,6 @@ import com.google.inject.Inject;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.recipes.cache.ChildData;
 import org.apache.curator.framework.recipes.cache.PathChildrenCache;
-import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
-import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
 import org.apache.curator.utils.ZKPaths;
 import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
@@ -35,6 +33,8 @@ import org.apache.druid.java.util.emitter.EmittingLogger;
 import org.apache.druid.segment.loading.SegmentLoaderConfig;
 import org.apache.druid.server.initialization.ZkPathsConfig;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.util.concurrent.ExecutorService;
 
@@ -54,6 +54,7 @@ public class ZkCoordinator
   private final DruidServerMetadata me;
   private final CuratorFramework curator;
 
+  @Nullable
   private volatile PathChildrenCache loadQueueCache;
   private volatile boolean started = false;
   private final ExecutorService segmentLoadUnloadService;
@@ -107,22 +108,17 @@ public class ZkCoordinator
         curator.newNamespaceAwareEnsurePath(liveSegmentsLocation).ensure(curator.getZookeeperClient());
 
         loadQueueCache.getListenable().addListener(
-            new PathChildrenCacheListener()
-            {
-              @Override
-              public void childEvent(CuratorFramework client, PathChildrenCacheEvent event)
-              {
-                final ChildData child = event.getData();
-                switch (event.getType()) {
-                  case CHILD_ADDED:
-                    childAdded(child);
-                    break;
-                  case CHILD_REMOVED:
-                    log.info("zNode[%s] was removed", event.getData().getPath());
-                    break;
-                  default:
-                    log.info("Ignoring event[%s]", event);
-                }
+            (client, event) -> {
+              final ChildData child = event.getData();
+              switch (event.getType()) {
+                case CHILD_ADDED:
+                  childAdded(child);
+                  break;
+                case CHILD_REMOVED:
+                  log.info("zNode[%s] was removed", event.getData().getPath());
+                  break;
+                default:
+                  log.info("Ignoring event[%s]", event);
               }
             }
 
@@ -151,25 +147,20 @@ public class ZkCoordinator
 
         finalRequest.go(
             dataSegmentChangeHandler,
-            new DataSegmentChangeCallback()
-            {
-              @Override
-              public void execute()
-              {
+            () -> {
+              try {
+                curator.delete().guaranteed().forPath(path);
+                log.info("Completed request [%s]", finalRequest.asString());
+              }
+              catch (Exception e) {
                 try {
                   curator.delete().guaranteed().forPath(path);
-                  log.info("Completed request [%s]", finalRequest.asString());
                 }
-                catch (Exception e) {
-                  try {
-                    curator.delete().guaranteed().forPath(path);
-                  }
-                  catch (Exception e1) {
-                    log.error(e1, "Failed to delete zNode[%s], but ignoring exception.", path);
-                  }
-                  log.error(e, "Exception while removing zNode[%s]", path);
-                  throw new RuntimeException(e);
+                catch (Exception e1) {
+                  log.error(e1, "Failed to delete zNode[%s], but ignoring exception.", path);
                 }
+                log.error(e, "Exception while removing zNode[%s]", path);
+                throw new RuntimeException(e);
               }
             }
         );


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org