You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ne...@apache.org on 2019/07/08 20:18:20 UTC

[incubator-pinot] branch master updated: CompletionConfig for realtime tables (#4367)

This is an automated email from the ASF dual-hosted git repository.

nehapawar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 02f6181  CompletionConfig for realtime tables (#4367)
02f6181 is described below

commit 02f6181e820d2905f3dc20ee7843cef08d0d8ec2
Author: Neha Pawar <np...@linkedin.com>
AuthorDate: Mon Jul 8 13:18:13 2019 -0700

    CompletionConfig for realtime tables (#4367)
    
    Completion config for realtime tables to specify completion related things such as completion mode (DOWNLOAD, DEFAULT).
---
 .../pinot/common/config/CompletionConfig.java      | 63 ++++++++++++++++
 .../SegmentsValidationAndRetentionConfig.java      | 30 +++++---
 .../apache/pinot/common/utils/CommonConstants.java |  8 ++
 .../pinot/common/config/TableConfigTest.java       | 28 +++++++
 .../realtime/LLRealtimeSegmentDataManager.java     | 85 +++++++++++++++-------
 5 files changed, 179 insertions(+), 35 deletions(-)

diff --git a/pinot-common/src/main/java/org/apache/pinot/common/config/CompletionConfig.java b/pinot-common/src/main/java/org/apache/pinot/common/config/CompletionConfig.java
new file mode 100644
index 0000000..9eb072d
--- /dev/null
+++ b/pinot-common/src/main/java/org/apache/pinot/common/config/CompletionConfig.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.config;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import org.apache.pinot.common.utils.EqualityUtils;
+
+
+/**
+ * Class representing configurations related to realtime segment completion.
+ *
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class CompletionConfig {
+
+  @ConfigKey(value = "completionMode")
+  @ConfigDoc(value = "Mode to use when completing segment. DEFAULT for default strategy (build segment if segment is equivalent to the committed segment, else download). DOWNLOAD for always download the segment, never build.", mandatory = false)
+  private String _completionMode;
+
+  public String getCompletionMode() {
+    return _completionMode;
+  }
+
+  public void setCompletionMode(String completionMode) {
+    _completionMode = completionMode;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (EqualityUtils.isSameReference(this, o)) {
+      return true;
+    }
+
+    if (EqualityUtils.isNullOrNotSameClass(this, o)) {
+      return false;
+    }
+
+    CompletionConfig that = (CompletionConfig) o;
+
+    return EqualityUtils.isEqual(_completionMode, that._completionMode);
+  }
+
+  @Override
+  public int hashCode() {
+    return EqualityUtils.hashCodeOf(_completionMode);
+  }
+}
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/config/SegmentsValidationAndRetentionConfig.java b/pinot-common/src/main/java/org/apache/pinot/common/config/SegmentsValidationAndRetentionConfig.java
index e9eeef0..5f917da 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/config/SegmentsValidationAndRetentionConfig.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/config/SegmentsValidationAndRetentionConfig.java
@@ -66,6 +66,9 @@ public class SegmentsValidationAndRetentionConfig {
   private ReplicaGroupStrategyConfig replicaGroupStrategyConfig;
 
   @NestedConfig
+  private CompletionConfig _completionConfig;
+
+  @NestedConfig
   private HllConfig hllConfig;
 
   // Number of replicas per partition of low-level consumers. This config is used for realtime tables only.
@@ -160,6 +163,14 @@ public class SegmentsValidationAndRetentionConfig {
     this.replicaGroupStrategyConfig = replicaGroupStrategyConfig;
   }
 
+  public CompletionConfig getCompletionConfig() {
+    return _completionConfig;
+  }
+
+  public void setCompletionConfig(CompletionConfig completionConfig) {
+    _completionConfig = completionConfig;
+  }
+
   public HllConfig getHllConfig() {
     return hllConfig;
   }
@@ -222,15 +233,15 @@ public class SegmentsValidationAndRetentionConfig {
 
     SegmentsValidationAndRetentionConfig that = (SegmentsValidationAndRetentionConfig) o;
 
-    return EqualityUtils.isEqual(retentionTimeUnit, that.retentionTimeUnit) && EqualityUtils
-        .isEqual(retentionTimeValue, that.retentionTimeValue) && EqualityUtils
-        .isEqual(segmentPushFrequency, that.segmentPushFrequency) && EqualityUtils
-        .isEqual(segmentPushType, that.segmentPushType) && EqualityUtils.isEqual(replication, that.replication)
-        && EqualityUtils.isEqual(schemaName, that.schemaName) && EqualityUtils
-        .isEqual(timeColumnName, that.timeColumnName) && EqualityUtils.isEqual(_timeType, that._timeType)
-        && EqualityUtils.isEqual(segmentAssignmentStrategy, that.segmentAssignmentStrategy) && EqualityUtils
-        .isEqual(replicaGroupStrategyConfig, that.replicaGroupStrategyConfig) && EqualityUtils
-        .isEqual(hllConfig, that.hllConfig) && EqualityUtils.isEqual(replicasPerPartition, that.replicasPerPartition);
+    return EqualityUtils.isEqual(retentionTimeUnit, that.retentionTimeUnit) && EqualityUtils.isEqual(retentionTimeValue,
+        that.retentionTimeValue) && EqualityUtils.isEqual(segmentPushFrequency, that.segmentPushFrequency)
+        && EqualityUtils.isEqual(segmentPushType, that.segmentPushType) && EqualityUtils.isEqual(replication,
+        that.replication) && EqualityUtils.isEqual(schemaName, that.schemaName) && EqualityUtils.isEqual(timeColumnName,
+        that.timeColumnName) && EqualityUtils.isEqual(_timeType, that._timeType) && EqualityUtils.isEqual(
+        segmentAssignmentStrategy, that.segmentAssignmentStrategy) && EqualityUtils.isEqual(replicaGroupStrategyConfig,
+        that.replicaGroupStrategyConfig) && EqualityUtils.isEqual(_completionConfig, that._completionConfig)
+        && EqualityUtils.isEqual(hllConfig, that.hllConfig) && EqualityUtils.isEqual(replicasPerPartition,
+        that.replicasPerPartition);
   }
 
   @Override
@@ -245,6 +256,7 @@ public class SegmentsValidationAndRetentionConfig {
     result = EqualityUtils.hashCodeOf(result, _timeType);
     result = EqualityUtils.hashCodeOf(result, segmentAssignmentStrategy);
     result = EqualityUtils.hashCodeOf(result, replicaGroupStrategyConfig);
+    result = EqualityUtils.hashCodeOf(result, _completionConfig);
     result = EqualityUtils.hashCodeOf(result, hllConfig);
     result = EqualityUtils.hashCodeOf(result, replicasPerPartition);
     return result;
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
index ff68450..693e331 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
@@ -320,6 +320,14 @@ public class CommonConstants {
         IN_PROGRESS, DONE
       }
 
+      /**
+       * During realtime segment completion, the value of this enum decides how  non-winner servers should replace  the completed segment.
+       */
+      public enum CompletionMode {
+        DEFAULT, // default behavior - if the in memory segment in the non-winner server is equivalent to the committed segment, then build and replace, else download
+        DOWNLOAD // non-winner servers always download the segment, never build it
+      }
+
       public static final String STATUS = "segment.realtime.status";
     }
 
diff --git a/pinot-common/src/test/java/org/apache/pinot/common/config/TableConfigTest.java b/pinot-common/src/test/java/org/apache/pinot/common/config/TableConfigTest.java
index 3821094..94fbe39 100644
--- a/pinot-common/src/test/java/org/apache/pinot/common/config/TableConfigTest.java
+++ b/pinot-common/src/test/java/org/apache/pinot/common/config/TableConfigTest.java
@@ -252,6 +252,21 @@ public class TableConfigTest {
       checkTableConfigWithAssignmentConfig(tableConfig, tableConfigToCompare);
     }
     {
+      CompletionConfig completionConfig = new CompletionConfig();
+      completionConfig.setCompletionMode("DEFAULT");
+
+      TableConfig tableConfig =
+          tableConfigBuilder.build();
+      tableConfig.getValidationConfig().setCompletionConfig(completionConfig);
+
+      // Serialize then de-serialize
+      TableConfig tableConfigToCompare = TableConfig.fromJsonConfig(tableConfig.toJsonConfig());
+      checkTableConfigWithCompletionConfig(tableConfig, tableConfigToCompare);
+
+      tableConfigToCompare = TableConfig.fromZnRecord(tableConfig.toZNRecord());
+      checkTableConfigWithCompletionConfig(tableConfig, tableConfigToCompare);
+    }
+    {
       // With default StreamConsumptionConfig
       TableConfig tableConfig = tableConfigBuilder.build();
       assertEquals(tableConfig.getIndexingConfig().getStreamConsumptionConfig().getStreamPartitionAssignmentStrategy(),
@@ -339,6 +354,19 @@ public class TableConfigTest {
     assertEquals(strategyConfig.getPartitionColumn(), "memberId");
   }
 
+  private void checkTableConfigWithCompletionConfig(TableConfig tableConfig, TableConfig tableConfigToCompare) {
+    // Check that the segment assignment configuration does exist.
+    assertEquals(tableConfigToCompare.getTableName(), tableConfig.getTableName());
+    assertNotNull(tableConfigToCompare.getValidationConfig().getCompletionConfig());
+    assertEquals(tableConfigToCompare.getValidationConfig().getCompletionConfig(),
+        tableConfig.getValidationConfig().getCompletionConfig());
+
+    // Check that the configurations are correct.
+    CompletionConfig completionConfig =
+        tableConfigToCompare.getValidationConfig().getCompletionConfig();
+    assertEquals(completionConfig.getCompletionMode(), "DEFAULT");
+  }
+
   private void checkTableConfigWithStarTreeConfig(TableConfig tableConfig, TableConfig tableConfigToCompare)
       throws Exception {
     // Check that the segment assignment configuration does exist.
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index e2cb15c..b5c9060 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -35,6 +35,7 @@ import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.common.Utils;
+import org.apache.pinot.common.config.CompletionConfig;
 import org.apache.pinot.common.config.IndexingConfig;
 import org.apache.pinot.common.config.SegmentPartitionConfig;
 import org.apache.pinot.common.config.TableConfig;
@@ -48,6 +49,7 @@ import org.apache.pinot.common.metrics.ServerGauge;
 import org.apache.pinot.common.metrics.ServerMeter;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
+import org.apache.pinot.common.utils.CommonConstants.Segment.Realtime.CompletionMode;
 import org.apache.pinot.common.utils.LLCSegmentName;
 import org.apache.pinot.common.utils.NetUtil;
 import org.apache.pinot.common.utils.TarGzCompressionUtils;
@@ -549,12 +551,20 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
               break;
             case KEEP:
               _state = State.RETAINING;
-              success = buildSegmentAndReplace();
-              if (success) {
-                _state = State.RETAINED;
-              } else {
-                // Could not build segment for some reason. We can only download it.
-                _state = State.ERROR;
+              CompletionMode segmentCompletionMode = getSegmentCompletionMode();
+              switch (segmentCompletionMode) {
+                case DOWNLOAD:
+                  _state = State.DISCARDED;
+                  break;
+                case DEFAULT:
+                  success = buildSegmentAndReplace();
+                  if (success) {
+                    _state = State.RETAINED;
+                  } else {
+                    // Could not build segment for some reason. We can only download it.
+                    _state = State.ERROR;
+                  }
+                  break;
               }
               break;
             case COMMIT:
@@ -602,6 +612,19 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
     }
   }
 
+  /**
+   * Fetches the completion mode for the segment completion for the given realtime table
+   */
+  private CompletionMode getSegmentCompletionMode() {
+    CompletionConfig completionConfig = _tableConfig.getValidationConfig().getCompletionConfig();
+    if (completionConfig != null) {
+      if (CompletionMode.DOWNLOAD.toString().equalsIgnoreCase(completionConfig.getCompletionMode())) {
+        return CompletionMode.DOWNLOAD;
+      }
+    }
+    return CompletionMode.DEFAULT;
+  }
+
   private File makeSegmentDirPath() {
     return new File(_resourceDataDir, _segmentZKMetadata.getSegmentName());
   }
@@ -912,27 +935,37 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
         case CATCHING_UP:
         case HOLDING:
         case INITIAL_CONSUMING:
-          // Allow to catch up upto final offset, and then replace.
-          if (_currentOffset > endOffset) {
-            // We moved ahead of the offset that is committed in ZK.
-            segmentLogger.warn("Current offset {} ahead of the offset in zk {}. Downloading to replace", _currentOffset,
-                endOffset);
-            downloadSegmentAndReplace(llcMetadata);
-          } else if (_currentOffset == endOffset) {
-            segmentLogger
-                .info("Current offset {} matches offset in zk {}. Replacing segment", _currentOffset, endOffset);
-            buildSegmentAndReplace();
-          } else {
-            segmentLogger.info("Attempting to catch up from offset {} to {} ", _currentOffset, endOffset);
-            boolean success = catchupToFinalOffset(endOffset,
-                TimeUnit.MILLISECONDS.convert(MAX_TIME_FOR_CONSUMING_TO_ONLINE_IN_SECONDS, TimeUnit.SECONDS));
-            if (success) {
-              segmentLogger.info("Caught up to offset {}", _currentOffset);
-              buildSegmentAndReplace();
-            } else {
-              segmentLogger.info("Could not catch up to offset (current = {}). Downloading to replace", _currentOffset);
+          CompletionMode segmentCompletionMode = getSegmentCompletionMode();
+          switch (segmentCompletionMode) {
+            case DOWNLOAD:
+              segmentLogger.info("State {}. CompletionMode {}. Downloading to replace", _state.toString(),
+                  segmentCompletionMode);
               downloadSegmentAndReplace(llcMetadata);
-            }
+              break;
+            case DEFAULT:
+              // Allow to catch up upto final offset, and then replace.
+              if (_currentOffset > endOffset) {
+                // We moved ahead of the offset that is committed in ZK.
+                segmentLogger.warn("Current offset {} ahead of the offset in zk {}. Downloading to replace", _currentOffset,
+                    endOffset);
+                downloadSegmentAndReplace(llcMetadata);
+              } else if (_currentOffset == endOffset) {
+                segmentLogger.info("Current offset {} matches offset in zk {}. Replacing segment", _currentOffset,
+                    endOffset);
+                buildSegmentAndReplace();
+              } else {
+                segmentLogger.info("Attempting to catch up from offset {} to {} ", _currentOffset, endOffset);
+                boolean success = catchupToFinalOffset(endOffset,
+                    TimeUnit.MILLISECONDS.convert(MAX_TIME_FOR_CONSUMING_TO_ONLINE_IN_SECONDS, TimeUnit.SECONDS));
+                if (success) {
+                  segmentLogger.info("Caught up to offset {}", _currentOffset);
+                  buildSegmentAndReplace();
+                } else {
+                  segmentLogger.info("Could not catch up to offset (current = {}). Downloading to replace", _currentOffset);
+                  downloadSegmentAndReplace(llcMetadata);
+                }
+              }
+              break;
           }
           break;
         default:


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org