You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by su...@apache.org on 2019/02/15 21:36:53 UTC

[incubator-druid] branch master updated: 2528 Replace Incremental Index Global Flags with Getters (#7043)

This is an automated email from the ASF dual-hosted git repository.

surekha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/master by this push:
     new c7eeeab  2528 Replace Incremental Index Global Flags with Getters (#7043)
c7eeeab is described below

commit c7eeeabf458b9a9afce086f4e5fbc456bf92d8e2
Author: Justin Borromeo <jb...@edu.uwaterloo.ca>
AuthorDate: Fri Feb 15 13:36:46 2019 -0800

    2528 Replace Incremental Index Global Flags with Getters (#7043)
    
    * Eliminated reportParseExceptions and deserializeComplexMetrics
    
    * Removed more global flags
    
    * Cleanup
    
    * Addressed Surekha's recommendations
---
 .../segment/incremental/IncrementalIndex.java      | 55 ++++++++++++++--------
 .../incremental/OffheapIncrementalIndex.java       | 21 ++++-----
 .../incremental/OnheapIncrementalIndex.java        | 15 +++---
 .../OnheapIncrementalIndexBenchmark.java           | 13 ++---
 .../druid/segment/realtime/plumber/Sink.java       |  2 +-
 5 files changed, 58 insertions(+), 48 deletions(-)

diff --git a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java
index 865cbf4..39df024 100644
--- a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java
+++ b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java
@@ -97,8 +97,6 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Stream;
 
-/**
- */
 public abstract class IncrementalIndex<AggregatorType> extends AbstractIndex implements Iterable<Row>, Closeable
 {
   private volatile DateTime maxIngestedEventTime;
@@ -250,7 +248,7 @@ public abstract class IncrementalIndex<AggregatorType> extends AbstractIndex imp
   /**
    * Setting deserializeComplexMetrics to false is necessary for intermediate aggregation such as groupBy that
    * should not deserialize input columns using ComplexMetricSerde for aggregators that return complex metrics.
-   *
+   * <p>
    * Set concurrentEventAdd to true to indicate that adding of input row should be thread-safe (for example, groupBy
    * where the multiple threads can add concurrently to the IncrementalIndex).
    *
@@ -482,12 +480,7 @@ public abstract class IncrementalIndex<AggregatorType> extends AbstractIndex imp
 
   // Note: This method needs to be thread safe.
   protected abstract AddToFactsResult addToFacts(
-      AggregatorFactory[] metrics,
-      boolean deserializeComplexMetrics,
-      boolean reportParseExceptions,
       InputRow row,
-      AtomicInteger numEntries,
-      AtomicLong sizeInBytes,
       IncrementalIndexRow key,
       ThreadLocal<InputRow> rowContainer,
       Supplier<InputRow> rowSupplier,
@@ -608,12 +601,7 @@ public abstract class IncrementalIndex<AggregatorType> extends AbstractIndex imp
   {
     IncrementalIndexRowResult incrementalIndexRowResult = toIncrementalIndexRow(row);
     final AddToFactsResult addToFactsResult = addToFacts(
-        metrics,
-        deserializeComplexMetrics,
-        reportParseExceptions,
         row,
-        numEntries,
-        bytesInMemory,
         incrementalIndexRowResult.getIncrementalIndexRow(),
         in,
         rowSupplier,
@@ -625,7 +613,11 @@ public abstract class IncrementalIndex<AggregatorType> extends AbstractIndex imp
         incrementalIndexRowResult.getParseExceptionMessages(),
         addToFactsResult.getParseExceptionMessages()
     );
-    return new IncrementalIndexAddResult(addToFactsResult.getRowCount(), addToFactsResult.getBytesInMemory(), parseException);
+    return new IncrementalIndexAddResult(
+        addToFactsResult.getRowCount(),
+        addToFactsResult.getBytesInMemory(),
+        parseException
+    );
   }
 
   @VisibleForTesting
@@ -785,9 +777,29 @@ public abstract class IncrementalIndex<AggregatorType> extends AbstractIndex imp
     return numEntries.get();
   }
 
-  public long getBytesInMemory()
+  boolean getDeserializeComplexMetrics()
+  {
+    return deserializeComplexMetrics;
+  }
+
+  boolean getReportParseExceptions()
+  {
+    return reportParseExceptions;
+  }
+
+  AtomicInteger getNumEntries()
+  {
+    return numEntries;
+  }
+
+  AggregatorFactory[] getMetrics()
   {
-    return bytesInMemory.get();
+    return metrics;
+  }
+
+  public AtomicLong getBytesInMemory()
+  {
+    return bytesInMemory;
   }
 
   private long getMinTimeMillis()
@@ -908,7 +920,10 @@ public abstract class IncrementalIndex<AggregatorType> extends AbstractIndex imp
    * Index dimension ordering could be changed to initialize from DimensionsSpec after resolution of
    * https://github.com/apache/incubator-druid/issues/2011
    */
-  public void loadDimensionIterable(Iterable<String> oldDimensionOrder, Map<String, ColumnCapabilitiesImpl> oldColumnCapabilities)
+  public void loadDimensionIterable(
+      Iterable<String> oldDimensionOrder,
+      Map<String, ColumnCapabilitiesImpl> oldColumnCapabilities
+  )
   {
     synchronized (dimensionDescs) {
       if (!dimensionDescs.isEmpty()) {
@@ -1289,7 +1304,9 @@ public abstract class IncrementalIndex<AggregatorType> extends AbstractIndex imp
     public Iterator<IncrementalIndexRow> iterator(boolean descending)
     {
       if (descending && sortFacts) {
-        return ((ConcurrentNavigableMap<IncrementalIndexRow, IncrementalIndexRow>) facts).descendingMap().keySet().iterator();
+        return ((ConcurrentNavigableMap<IncrementalIndexRow, IncrementalIndexRow>) facts).descendingMap()
+                                                                                         .keySet()
+                                                                                         .iterator();
       }
       return keySet().iterator();
     }
@@ -1387,7 +1404,7 @@ public abstract class IncrementalIndex<AggregatorType> extends AbstractIndex imp
     {
       if (descending && sortFacts) {
         return timeOrderedConcat(((ConcurrentNavigableMap<Long, Deque<IncrementalIndexRow>>) facts)
-                .descendingMap().values(), true).iterator();
+                                     .descendingMap().values(), true).iterator();
       }
       return timeOrderedConcat(facts.values(), false).iterator();
     }
diff --git a/processing/src/main/java/org/apache/druid/segment/incremental/OffheapIncrementalIndex.java b/processing/src/main/java/org/apache/druid/segment/incremental/OffheapIncrementalIndex.java
index f18f768..95c88fc 100644
--- a/processing/src/main/java/org/apache/druid/segment/incremental/OffheapIncrementalIndex.java
+++ b/processing/src/main/java/org/apache/druid/segment/incremental/OffheapIncrementalIndex.java
@@ -40,9 +40,9 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
 
 /**
+ *
  */
 public class OffheapIncrementalIndex extends IncrementalIndex<BufferAggregator>
 {
@@ -133,19 +133,15 @@ public class OffheapIncrementalIndex extends IncrementalIndex<BufferAggregator>
       }
     }
 
-    aggsTotalSize = aggOffsetInBuffer[metrics.length - 1] + metrics[metrics.length - 1].getMaxIntermediateSizeWithNulls();
+    aggsTotalSize = aggOffsetInBuffer[metrics.length - 1] + metrics[metrics.length
+                                                                    - 1].getMaxIntermediateSizeWithNulls();
 
     return new BufferAggregator[metrics.length];
   }
 
   @Override
   protected AddToFactsResult addToFacts(
-      AggregatorFactory[] metrics,
-      boolean deserializeComplexMetrics,
-      boolean reportParseExceptions,
       InputRow row,
-      AtomicInteger numEntries,
-      AtomicLong sizeInBytes, // ignored, added to make abstract class method impl happy
       IncrementalIndexRow key,
       ThreadLocal<InputRow> rowContainer,
       Supplier<InputRow> rowSupplier,
@@ -157,6 +153,7 @@ public class OffheapIncrementalIndex extends IncrementalIndex<BufferAggregator>
     int bufferOffset;
 
     synchronized (this) {
+      final AggregatorFactory[] metrics = getMetrics();
       final int priorIndex = facts.getPriorIndex(key);
       if (IncrementalIndexRow.EMPTY_ROW_INDEX != priorIndex) {
         final int[] indexAndOffset = indexAndOffsets.get(priorIndex);
@@ -202,7 +199,7 @@ public class OffheapIncrementalIndex extends IncrementalIndex<BufferAggregator>
         }
 
         // Last ditch sanity checks
-        if (numEntries.get() >= maxRowCount && facts.getPriorIndex(key) == IncrementalIndexRow.EMPTY_ROW_INDEX) {
+        if (getNumEntries().get() >= maxRowCount && facts.getPriorIndex(key) == IncrementalIndexRow.EMPTY_ROW_INDEX) {
           throw new IndexSizeExceededException("Maximum number of rows [%d] reached", maxRowCount);
         }
 
@@ -213,7 +210,7 @@ public class OffheapIncrementalIndex extends IncrementalIndex<BufferAggregator>
         indexAndOffsets.add(new int[]{bufferIndex, bufferOffset});
         final int prev = facts.putIfAbsent(key, rowIndex);
         if (IncrementalIndexRow.EMPTY_ROW_INDEX == prev) {
-          numEntries.incrementAndGet();
+          getNumEntries().incrementAndGet();
         } else {
           throw new ISE("WTF! we are in sychronized block.");
         }
@@ -222,7 +219,7 @@ public class OffheapIncrementalIndex extends IncrementalIndex<BufferAggregator>
 
     rowContainer.set(row);
 
-    for (int i = 0; i < metrics.length; i++) {
+    for (int i = 0; i < getMetrics().length; i++) {
       final BufferAggregator agg = getAggs()[i];
 
       synchronized (agg) {
@@ -231,7 +228,7 @@ public class OffheapIncrementalIndex extends IncrementalIndex<BufferAggregator>
         }
         catch (ParseException e) {
           // "aggregate" can throw ParseExceptions if a selector expects something but gets something else.
-          if (reportParseExceptions) {
+          if (getReportParseExceptions()) {
             throw new ParseException(e, "Encountered parse error for aggregator[%s]", getMetricAggs()[i].getName());
           } else {
             log.debug(e, "Encountered parse error, skipping aggregator[%s].", getMetricAggs()[i].getName());
@@ -240,7 +237,7 @@ public class OffheapIncrementalIndex extends IncrementalIndex<BufferAggregator>
       }
     }
     rowContainer.set(null);
-    return new AddToFactsResult(numEntries.get(), 0, new ArrayList<>());
+    return new AddToFactsResult(getNumEntries().get(), 0, new ArrayList<>());
   }
 
   @Override
diff --git a/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java b/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java
index 6f49730..cd65797 100644
--- a/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java
+++ b/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java
@@ -46,6 +46,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
 /**
+ *
  */
 public class OnheapIncrementalIndex extends IncrementalIndex<Aggregator>
 {
@@ -105,7 +106,8 @@ public class OnheapIncrementalIndex extends IncrementalIndex<Aggregator>
   {
     long maxAggregatorIntermediateSize = Integer.BYTES * incrementalIndexSchema.getMetrics().length;
     maxAggregatorIntermediateSize += Arrays.stream(incrementalIndexSchema.getMetrics())
-                                           .mapToLong(aggregator -> aggregator.getMaxIntermediateSizeWithNulls() + Long.BYTES * 2)
+                                           .mapToLong(aggregator -> aggregator.getMaxIntermediateSizeWithNulls()
+                                                                    + Long.BYTES * 2)
                                            .sum();
     return maxAggregatorIntermediateSize;
   }
@@ -140,12 +142,7 @@ public class OnheapIncrementalIndex extends IncrementalIndex<Aggregator>
 
   @Override
   protected AddToFactsResult addToFacts(
-      AggregatorFactory[] metrics,
-      boolean deserializeComplexMetrics,
-      boolean reportParseExceptions,
       InputRow row,
-      AtomicInteger numEntries,
-      AtomicLong sizeInBytes,
       IncrementalIndexRow key,
       ThreadLocal<InputRow> rowContainer,
       Supplier<InputRow> rowSupplier,
@@ -156,7 +153,9 @@ public class OnheapIncrementalIndex extends IncrementalIndex<Aggregator>
     final int priorIndex = facts.getPriorIndex(key);
 
     Aggregator[] aggs;
-
+    final AggregatorFactory[] metrics = getMetrics();
+    final AtomicInteger numEntries = getNumEntries();
+    final AtomicLong sizeInBytes = getBytesInMemory();
     if (IncrementalIndexRow.EMPTY_ROW_INDEX != priorIndex) {
       aggs = concurrentGet(priorIndex);
       parseExceptionMessages = doAggregate(metrics, aggs, rowContainer, row);
@@ -301,7 +300,7 @@ public class OnheapIncrementalIndex extends IncrementalIndex<Aggregator>
   {
     final boolean countCheck = size() < maxRowCount;
     // if maxBytesInMemory = -1, then ignore sizeCheck
-    final boolean sizeCheck = maxBytesInMemory <= 0 || getBytesInMemory() < maxBytesInMemory;
+    final boolean sizeCheck = maxBytesInMemory <= 0 || getBytesInMemory().get() < maxBytesInMemory;
     final boolean canAdd = countCheck && sizeCheck;
     if (!countCheck && !sizeCheck) {
       outOfRowsReason = StringUtils.format(
diff --git a/processing/src/test/java/org/apache/druid/segment/incremental/OnheapIncrementalIndexBenchmark.java b/processing/src/test/java/org/apache/druid/segment/incremental/OnheapIncrementalIndexBenchmark.java
index 096e4d3..886260a 100644
--- a/processing/src/test/java/org/apache/druid/segment/incremental/OnheapIncrementalIndexBenchmark.java
+++ b/processing/src/test/java/org/apache/druid/segment/incremental/OnheapIncrementalIndexBenchmark.java
@@ -171,12 +171,7 @@ public class OnheapIncrementalIndexBenchmark extends AbstractBenchmark
 
     @Override
     protected AddToFactsResult addToFacts(
-        AggregatorFactory[] metrics,
-        boolean deserializeComplexMetrics,
-        boolean reportParseExceptions,
         InputRow row,
-        AtomicInteger numEntries,
-        AtomicLong sizeInBytes,
         IncrementalIndexRow key,
         ThreadLocal<InputRow> rowContainer,
         Supplier<InputRow> rowSupplier,
@@ -187,7 +182,9 @@ public class OnheapIncrementalIndexBenchmark extends AbstractBenchmark
       final Integer priorIdex = getFacts().getPriorIndex(key);
 
       Aggregator[] aggs;
-
+      final AggregatorFactory[] metrics = getMetrics();
+      final AtomicInteger numEntries = getNumEntries();
+      final AtomicLong sizeInBytes = getBytesInMemory();
       if (null != priorIdex) {
         aggs = indexedMap.get(priorIdex);
       } else {
@@ -196,7 +193,7 @@ public class OnheapIncrementalIndexBenchmark extends AbstractBenchmark
         for (int i = 0; i < metrics.length; i++) {
           final AggregatorFactory agg = metrics[i];
           aggs[i] = agg.factorize(
-              makeColumnSelectorFactory(agg, rowSupplier, deserializeComplexMetrics)
+              makeColumnSelectorFactory(agg, rowSupplier, getDeserializeComplexMetrics())
           );
         }
         Integer rowIndex;
@@ -233,7 +230,7 @@ public class OnheapIncrementalIndexBenchmark extends AbstractBenchmark
           }
           catch (ParseException e) {
             // "aggregate" can throw ParseExceptions if a selector expects something but gets something else.
-            if (reportParseExceptions) {
+            if (getReportParseExceptions()) {
               throw e;
             }
           }
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/plumber/Sink.java b/server/src/main/java/org/apache/druid/segment/realtime/plumber/Sink.java
index 442b893..d2d72ba 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/plumber/Sink.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/plumber/Sink.java
@@ -279,7 +279,7 @@ public class Sink implements Iterable<FireHydrant>
         return 0;
       }
 
-      return currHydrant.getIndex().getBytesInMemory();
+      return currHydrant.getIndex().getBytesInMemory().get();
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org