You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by su...@apache.org on 2019/02/15 21:36:53 UTC
[incubator-druid] branch master updated: 2528 Replace Incremental
Index Global Flags with Getters (#7043)
This is an automated email from the ASF dual-hosted git repository.
surekha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/master by this push:
new c7eeeab 2528 Replace Incremental Index Global Flags with Getters (#7043)
c7eeeab is described below
commit c7eeeabf458b9a9afce086f4e5fbc456bf92d8e2
Author: Justin Borromeo <jb...@edu.uwaterloo.ca>
AuthorDate: Fri Feb 15 13:36:46 2019 -0800
2528 Replace Incremental Index Global Flags with Getters (#7043)
* Eliminated reportParseExceptions and deserializeComplexMetrics
* Removed more global flags
* Cleanup
* Addressed Surekha's recommendations
---
.../segment/incremental/IncrementalIndex.java | 55 ++++++++++++++--------
.../incremental/OffheapIncrementalIndex.java | 21 ++++-----
.../incremental/OnheapIncrementalIndex.java | 15 +++---
.../OnheapIncrementalIndexBenchmark.java | 13 ++---
.../druid/segment/realtime/plumber/Sink.java | 2 +-
5 files changed, 58 insertions(+), 48 deletions(-)
diff --git a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java
index 865cbf4..39df024 100644
--- a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java
+++ b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java
@@ -97,8 +97,6 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Stream;
-/**
- */
public abstract class IncrementalIndex<AggregatorType> extends AbstractIndex implements Iterable<Row>, Closeable
{
private volatile DateTime maxIngestedEventTime;
@@ -250,7 +248,7 @@ public abstract class IncrementalIndex<AggregatorType> extends AbstractIndex imp
/**
* Setting deserializeComplexMetrics to false is necessary for intermediate aggregation such as groupBy that
* should not deserialize input columns using ComplexMetricSerde for aggregators that return complex metrics.
- *
+ * <p>
* Set concurrentEventAdd to true to indicate that adding of input row should be thread-safe (for example, groupBy
* where the multiple threads can add concurrently to the IncrementalIndex).
*
@@ -482,12 +480,7 @@ public abstract class IncrementalIndex<AggregatorType> extends AbstractIndex imp
// Note: This method needs to be thread safe.
protected abstract AddToFactsResult addToFacts(
- AggregatorFactory[] metrics,
- boolean deserializeComplexMetrics,
- boolean reportParseExceptions,
InputRow row,
- AtomicInteger numEntries,
- AtomicLong sizeInBytes,
IncrementalIndexRow key,
ThreadLocal<InputRow> rowContainer,
Supplier<InputRow> rowSupplier,
@@ -608,12 +601,7 @@ public abstract class IncrementalIndex<AggregatorType> extends AbstractIndex imp
{
IncrementalIndexRowResult incrementalIndexRowResult = toIncrementalIndexRow(row);
final AddToFactsResult addToFactsResult = addToFacts(
- metrics,
- deserializeComplexMetrics,
- reportParseExceptions,
row,
- numEntries,
- bytesInMemory,
incrementalIndexRowResult.getIncrementalIndexRow(),
in,
rowSupplier,
@@ -625,7 +613,11 @@ public abstract class IncrementalIndex<AggregatorType> extends AbstractIndex imp
incrementalIndexRowResult.getParseExceptionMessages(),
addToFactsResult.getParseExceptionMessages()
);
- return new IncrementalIndexAddResult(addToFactsResult.getRowCount(), addToFactsResult.getBytesInMemory(), parseException);
+ return new IncrementalIndexAddResult(
+ addToFactsResult.getRowCount(),
+ addToFactsResult.getBytesInMemory(),
+ parseException
+ );
}
@VisibleForTesting
@@ -785,9 +777,29 @@ public abstract class IncrementalIndex<AggregatorType> extends AbstractIndex imp
return numEntries.get();
}
- public long getBytesInMemory()
+ boolean getDeserializeComplexMetrics()
+ {
+ return deserializeComplexMetrics;
+ }
+
+ boolean getReportParseExceptions()
+ {
+ return reportParseExceptions;
+ }
+
+ AtomicInteger getNumEntries()
+ {
+ return numEntries;
+ }
+
+ AggregatorFactory[] getMetrics()
{
- return bytesInMemory.get();
+ return metrics;
+ }
+
+ public AtomicLong getBytesInMemory()
+ {
+ return bytesInMemory;
}
private long getMinTimeMillis()
@@ -908,7 +920,10 @@ public abstract class IncrementalIndex<AggregatorType> extends AbstractIndex imp
* Index dimension ordering could be changed to initialize from DimensionsSpec after resolution of
* https://github.com/apache/incubator-druid/issues/2011
*/
- public void loadDimensionIterable(Iterable<String> oldDimensionOrder, Map<String, ColumnCapabilitiesImpl> oldColumnCapabilities)
+ public void loadDimensionIterable(
+ Iterable<String> oldDimensionOrder,
+ Map<String, ColumnCapabilitiesImpl> oldColumnCapabilities
+ )
{
synchronized (dimensionDescs) {
if (!dimensionDescs.isEmpty()) {
@@ -1289,7 +1304,9 @@ public abstract class IncrementalIndex<AggregatorType> extends AbstractIndex imp
public Iterator<IncrementalIndexRow> iterator(boolean descending)
{
if (descending && sortFacts) {
- return ((ConcurrentNavigableMap<IncrementalIndexRow, IncrementalIndexRow>) facts).descendingMap().keySet().iterator();
+ return ((ConcurrentNavigableMap<IncrementalIndexRow, IncrementalIndexRow>) facts).descendingMap()
+ .keySet()
+ .iterator();
}
return keySet().iterator();
}
@@ -1387,7 +1404,7 @@ public abstract class IncrementalIndex<AggregatorType> extends AbstractIndex imp
{
if (descending && sortFacts) {
return timeOrderedConcat(((ConcurrentNavigableMap<Long, Deque<IncrementalIndexRow>>) facts)
- .descendingMap().values(), true).iterator();
+ .descendingMap().values(), true).iterator();
}
return timeOrderedConcat(facts.values(), false).iterator();
}
diff --git a/processing/src/main/java/org/apache/druid/segment/incremental/OffheapIncrementalIndex.java b/processing/src/main/java/org/apache/druid/segment/incremental/OffheapIncrementalIndex.java
index f18f768..95c88fc 100644
--- a/processing/src/main/java/org/apache/druid/segment/incremental/OffheapIncrementalIndex.java
+++ b/processing/src/main/java/org/apache/druid/segment/incremental/OffheapIncrementalIndex.java
@@ -40,9 +40,9 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
/**
+ *
*/
public class OffheapIncrementalIndex extends IncrementalIndex<BufferAggregator>
{
@@ -133,19 +133,15 @@ public class OffheapIncrementalIndex extends IncrementalIndex<BufferAggregator>
}
}
- aggsTotalSize = aggOffsetInBuffer[metrics.length - 1] + metrics[metrics.length - 1].getMaxIntermediateSizeWithNulls();
+ aggsTotalSize = aggOffsetInBuffer[metrics.length - 1] + metrics[metrics.length
+ - 1].getMaxIntermediateSizeWithNulls();
return new BufferAggregator[metrics.length];
}
@Override
protected AddToFactsResult addToFacts(
- AggregatorFactory[] metrics,
- boolean deserializeComplexMetrics,
- boolean reportParseExceptions,
InputRow row,
- AtomicInteger numEntries,
- AtomicLong sizeInBytes, // ignored, added to make abstract class method impl happy
IncrementalIndexRow key,
ThreadLocal<InputRow> rowContainer,
Supplier<InputRow> rowSupplier,
@@ -157,6 +153,7 @@ public class OffheapIncrementalIndex extends IncrementalIndex<BufferAggregator>
int bufferOffset;
synchronized (this) {
+ final AggregatorFactory[] metrics = getMetrics();
final int priorIndex = facts.getPriorIndex(key);
if (IncrementalIndexRow.EMPTY_ROW_INDEX != priorIndex) {
final int[] indexAndOffset = indexAndOffsets.get(priorIndex);
@@ -202,7 +199,7 @@ public class OffheapIncrementalIndex extends IncrementalIndex<BufferAggregator>
}
// Last ditch sanity checks
- if (numEntries.get() >= maxRowCount && facts.getPriorIndex(key) == IncrementalIndexRow.EMPTY_ROW_INDEX) {
+ if (getNumEntries().get() >= maxRowCount && facts.getPriorIndex(key) == IncrementalIndexRow.EMPTY_ROW_INDEX) {
throw new IndexSizeExceededException("Maximum number of rows [%d] reached", maxRowCount);
}
@@ -213,7 +210,7 @@ public class OffheapIncrementalIndex extends IncrementalIndex<BufferAggregator>
indexAndOffsets.add(new int[]{bufferIndex, bufferOffset});
final int prev = facts.putIfAbsent(key, rowIndex);
if (IncrementalIndexRow.EMPTY_ROW_INDEX == prev) {
- numEntries.incrementAndGet();
+ getNumEntries().incrementAndGet();
} else {
throw new ISE("WTF! we are in sychronized block.");
}
@@ -222,7 +219,7 @@ public class OffheapIncrementalIndex extends IncrementalIndex<BufferAggregator>
rowContainer.set(row);
- for (int i = 0; i < metrics.length; i++) {
+ for (int i = 0; i < getMetrics().length; i++) {
final BufferAggregator agg = getAggs()[i];
synchronized (agg) {
@@ -231,7 +228,7 @@ public class OffheapIncrementalIndex extends IncrementalIndex<BufferAggregator>
}
catch (ParseException e) {
// "aggregate" can throw ParseExceptions if a selector expects something but gets something else.
- if (reportParseExceptions) {
+ if (getReportParseExceptions()) {
throw new ParseException(e, "Encountered parse error for aggregator[%s]", getMetricAggs()[i].getName());
} else {
log.debug(e, "Encountered parse error, skipping aggregator[%s].", getMetricAggs()[i].getName());
@@ -240,7 +237,7 @@ public class OffheapIncrementalIndex extends IncrementalIndex<BufferAggregator>
}
}
rowContainer.set(null);
- return new AddToFactsResult(numEntries.get(), 0, new ArrayList<>());
+ return new AddToFactsResult(getNumEntries().get(), 0, new ArrayList<>());
}
@Override
diff --git a/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java b/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java
index 6f49730..cd65797 100644
--- a/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java
+++ b/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java
@@ -46,6 +46,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
/**
+ *
*/
public class OnheapIncrementalIndex extends IncrementalIndex<Aggregator>
{
@@ -105,7 +106,8 @@ public class OnheapIncrementalIndex extends IncrementalIndex<Aggregator>
{
long maxAggregatorIntermediateSize = Integer.BYTES * incrementalIndexSchema.getMetrics().length;
maxAggregatorIntermediateSize += Arrays.stream(incrementalIndexSchema.getMetrics())
- .mapToLong(aggregator -> aggregator.getMaxIntermediateSizeWithNulls() + Long.BYTES * 2)
+ .mapToLong(aggregator -> aggregator.getMaxIntermediateSizeWithNulls()
+ + Long.BYTES * 2)
.sum();
return maxAggregatorIntermediateSize;
}
@@ -140,12 +142,7 @@ public class OnheapIncrementalIndex extends IncrementalIndex<Aggregator>
@Override
protected AddToFactsResult addToFacts(
- AggregatorFactory[] metrics,
- boolean deserializeComplexMetrics,
- boolean reportParseExceptions,
InputRow row,
- AtomicInteger numEntries,
- AtomicLong sizeInBytes,
IncrementalIndexRow key,
ThreadLocal<InputRow> rowContainer,
Supplier<InputRow> rowSupplier,
@@ -156,7 +153,9 @@ public class OnheapIncrementalIndex extends IncrementalIndex<Aggregator>
final int priorIndex = facts.getPriorIndex(key);
Aggregator[] aggs;
-
+ final AggregatorFactory[] metrics = getMetrics();
+ final AtomicInteger numEntries = getNumEntries();
+ final AtomicLong sizeInBytes = getBytesInMemory();
if (IncrementalIndexRow.EMPTY_ROW_INDEX != priorIndex) {
aggs = concurrentGet(priorIndex);
parseExceptionMessages = doAggregate(metrics, aggs, rowContainer, row);
@@ -301,7 +300,7 @@ public class OnheapIncrementalIndex extends IncrementalIndex<Aggregator>
{
final boolean countCheck = size() < maxRowCount;
// if maxBytesInMemory = -1, then ignore sizeCheck
- final boolean sizeCheck = maxBytesInMemory <= 0 || getBytesInMemory() < maxBytesInMemory;
+ final boolean sizeCheck = maxBytesInMemory <= 0 || getBytesInMemory().get() < maxBytesInMemory;
final boolean canAdd = countCheck && sizeCheck;
if (!countCheck && !sizeCheck) {
outOfRowsReason = StringUtils.format(
diff --git a/processing/src/test/java/org/apache/druid/segment/incremental/OnheapIncrementalIndexBenchmark.java b/processing/src/test/java/org/apache/druid/segment/incremental/OnheapIncrementalIndexBenchmark.java
index 096e4d3..886260a 100644
--- a/processing/src/test/java/org/apache/druid/segment/incremental/OnheapIncrementalIndexBenchmark.java
+++ b/processing/src/test/java/org/apache/druid/segment/incremental/OnheapIncrementalIndexBenchmark.java
@@ -171,12 +171,7 @@ public class OnheapIncrementalIndexBenchmark extends AbstractBenchmark
@Override
protected AddToFactsResult addToFacts(
- AggregatorFactory[] metrics,
- boolean deserializeComplexMetrics,
- boolean reportParseExceptions,
InputRow row,
- AtomicInteger numEntries,
- AtomicLong sizeInBytes,
IncrementalIndexRow key,
ThreadLocal<InputRow> rowContainer,
Supplier<InputRow> rowSupplier,
@@ -187,7 +182,9 @@ public class OnheapIncrementalIndexBenchmark extends AbstractBenchmark
final Integer priorIdex = getFacts().getPriorIndex(key);
Aggregator[] aggs;
-
+ final AggregatorFactory[] metrics = getMetrics();
+ final AtomicInteger numEntries = getNumEntries();
+ final AtomicLong sizeInBytes = getBytesInMemory();
if (null != priorIdex) {
aggs = indexedMap.get(priorIdex);
} else {
@@ -196,7 +193,7 @@ public class OnheapIncrementalIndexBenchmark extends AbstractBenchmark
for (int i = 0; i < metrics.length; i++) {
final AggregatorFactory agg = metrics[i];
aggs[i] = agg.factorize(
- makeColumnSelectorFactory(agg, rowSupplier, deserializeComplexMetrics)
+ makeColumnSelectorFactory(agg, rowSupplier, getDeserializeComplexMetrics())
);
}
Integer rowIndex;
@@ -233,7 +230,7 @@ public class OnheapIncrementalIndexBenchmark extends AbstractBenchmark
}
catch (ParseException e) {
// "aggregate" can throw ParseExceptions if a selector expects something but gets something else.
- if (reportParseExceptions) {
+ if (getReportParseExceptions()) {
throw e;
}
}
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/plumber/Sink.java b/server/src/main/java/org/apache/druid/segment/realtime/plumber/Sink.java
index 442b893..d2d72ba 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/plumber/Sink.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/plumber/Sink.java
@@ -279,7 +279,7 @@ public class Sink implements Iterable<FireHydrant>
return 0;
}
- return currHydrant.getIndex().getBytesInMemory();
+ return currHydrant.getIndex().getBytesInMemory().get();
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org