You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2019/06/17 08:12:06 UTC

[GitHub] [incubator-druid] eranmeir commented on a change in pull request #7838: Improve IncrementalIndex concurrency scalability

eranmeir commented on a change in pull request #7838: Improve IncrementalIndex concurrency scalability
URL: https://github.com/apache/incubator-druid/pull/7838#discussion_r294179263
 
 

 ##########
 File path: processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java
 ##########
 @@ -627,98 +624,96 @@ IncrementalIndexRowResult toIncrementalIndexRow(InputRow row)
     if (row.getTimestampFromEpoch() < minTimestamp) {
       throw new IAE("Cannot add row[%s] because it is below the minTimestamp[%s]", row, DateTimes.utc(minTimestamp));
     }
-
     final List<String> rowDimensions = row.getDimensions();
 
-    Object[] dims;
-    List<Object> overflow = null;
+    Map<String, Object> rowDimKeys = new HashMap<>();
     long dimsKeySize = 0;
     List<String> parseExceptionMessages = new ArrayList<>();
-    synchronized (dimensionDescs) {
-      dims = new Object[dimensionDescs.size()];
-      for (String dimension : rowDimensions) {
-        if (Strings.isNullOrEmpty(dimension)) {
-          continue;
-        }
-        boolean wasNewDim = false;
-        ColumnCapabilitiesImpl capabilities;
-        DimensionDesc desc = dimensionDescs.get(dimension);
-        if (desc != null) {
-          capabilities = desc.getCapabilities();
-        } else {
-          wasNewDim = true;
-          capabilities = columnCapabilities.get(dimension);
-          if (capabilities == null) {
-            capabilities = new ColumnCapabilitiesImpl();
-            // For schemaless type discovery, assume everything is a String for now, can change later.
-            capabilities.setType(ValueType.STRING);
-            capabilities.setDictionaryEncoded(true);
-            capabilities.setHasBitmapIndexes(true);
-            columnCapabilities.put(dimension, capabilities);
-          }
-          DimensionHandler handler = DimensionHandlerUtils.getHandlerFromCapabilities(dimension, capabilities, null);
-          desc = addNewDimension(dimension, capabilities, handler);
-        }
-        DimensionHandler handler = desc.getHandler();
-        DimensionIndexer indexer = desc.getIndexer();
-        Object dimsKey = null;
-        try {
-          dimsKey = indexer.processRowValsToUnsortedEncodedKeyComponent(
-              row.getRaw(dimension),
-              true
-          );
+
+    DimensionData prevDimensionData = this.dimensions.get();
+    DimensionData dimensionData = null;
+    for (String dimension : rowDimensions) {
+      if (Strings.isNullOrEmpty(dimension)) {
+        continue;
+      }
+
+      if (rowDimKeys.containsKey(dimension)) {
+        // If the dims map already contains a mapping at this index, it means we have seen this dimension already on this input row.
+        throw new ISE("Dimension[%s] occurred more than once in InputRow", dimension);
+      }
+      ColumnCapabilitiesImpl capabilities;
+      DimensionDesc desc = prevDimensionData.getDimensionDesc(dimension);
+      if (desc != null) {
+        capabilities = desc.getCapabilities();
+      } else {
+        if (dimensionData == null) {
+          dimensionData = prevDimensionData.clone();
         }
-        catch (ParseException pe) {
-          parseExceptionMessages.add(pe.getMessage());
+        capabilities = dimensionData.getDimensionCapabilities(dimension);
+        if (capabilities == null) {
+          capabilities = new ColumnCapabilitiesImpl();
+          // For schemaless type discovery, assume everything is a String for now, can change later.
+          capabilities.setType(ValueType.STRING);
+          capabilities.setDictionaryEncoded(true);
+          capabilities.setHasBitmapIndexes(true);
+          dimensionData.putCapabilities(dimension, capabilities);
         }
-        dimsKeySize += indexer.estimateEncodedKeyComponentSize(dimsKey);
-        // Set column capabilities as data is coming in
-        if (!capabilities.hasMultipleValues() &&
-            dimsKey != null &&
-            handler.getLengthOfEncodedKeyComponent(dimsKey) > 1) {
-          capabilities.setHasMultipleValues(true);
+        DimensionHandler handler = DimensionHandlerUtils.getHandlerFromCapabilities(dimension, capabilities, null);
+        if (dimensionData == null) {
+          dimensionData = prevDimensionData.clone();
         }
-
-        if (wasNewDim) {
-          if (overflow == null) {
-            overflow = new ArrayList<>();
-          }
-          overflow.add(dimsKey);
-        } else if (desc.getIndex() > dims.length || dims[desc.getIndex()] != null) {
-          /*
-           * index > dims.length requires that we saw this dimension and added it to the dimensionOrder map,
-           * otherwise index is null. Since dims is initialized based on the size of dimensionOrder on each call to add,
-           * it must have been added to dimensionOrder during this InputRow.
-           *
-           * if we found an index for this dimension it means we've seen it already. If !(index > dims.length) then
-           * we saw it on a previous input row (this its safe to index into dims). If we found a value in
-           * the dims array for this index, it means we have seen this dimension already on this input row.
-           */
-          throw new ISE("Dimension[%s] occurred more than once in InputRow", dimension);
-        } else {
-          dims[desc.getIndex()] = dimsKey;
+        desc = dimensionData.addNewDimension(dimension, capabilities, handler);
+      }
+      DimensionHandler handler = desc.getHandler();
+      DimensionIndexer indexer = desc.getIndexer();
+      Object dimsKey = null;
+      try {
+        dimsKey = indexer.processRowValsToUnsortedEncodedKeyComponent(
+            row.getRaw(dimension),
+            true
+        );
+      }
+      catch (ParseException pe) {
+        parseExceptionMessages.add(pe.getMessage());
+      }
+      dimsKeySize += indexer.estimateEncodedKeyComponentSize(dimsKey);
+      // Set column capabilities as data is coming in
+      if (!capabilities.hasMultipleValues() &&
+          dimsKey != null &&
+          handler.getLengthOfEncodedKeyComponent(dimsKey) > 1) {
+        if (dimensionData == null) {
+          dimensionData = prevDimensionData.clone();
         }
+        capabilities = dimensionData.getDimensionCapabilities(dimension);
+        capabilities.setHasMultipleValues(true);
       }
+      rowDimKeys.put(dimension, dimsKey);
     }
 
-    if (overflow != null) {
-      // Merge overflow and non-overflow
-      Object[] newDims = new Object[dims.length + overflow.size()];
-      System.arraycopy(dims, 0, newDims, 0, dims.length);
-      for (int i = 0; i < overflow.size(); ++i) {
-        newDims[dims.length + i] = overflow.get(i);
+
+    if (dimensionData != null) {
+      while (!dimensions.compareAndSet(prevDimensionData, dimensionData)) {
 
 Review comment:
   From what I gather, there's not a lot of contention in updating dimension data. The motivation to use CAS rather than locking was to save thread state changes, which may result in more costly context switching. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org