You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2020/11/13 23:17:47 UTC

[GitHub] [incubator-pinot] yupeng9 commented on a change in pull request #6247: Add stream and batch to ingestionConfig

yupeng9 commented on a change in pull request #6247:
URL: https://github.com/apache/incubator-pinot/pull/6247#discussion_r523279240



##########
File path: pinot-spi/src/main/java/org/apache/pinot/spi/config/table/SegmentsValidationAndRetentionConfig.java
##########
@@ -22,14 +22,17 @@
 import java.util.concurrent.TimeUnit;
 
 import org.apache.pinot.spi.config.BaseJsonConfig;
+import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
 import org.apache.pinot.spi.utils.TimeUtils;
 
 
 // TODO: Consider break this config into multiple configs
 public class SegmentsValidationAndRetentionConfig extends BaseJsonConfig {
   private String _retentionTimeUnit;
   private String _retentionTimeValue;
+  @Deprecated
   private String _segmentPushFrequency; // DO NOT REMOVE, this is used in internal segment generation management
+  @Deprecated

Review comment:
       can you add docs on the replacemet?

##########
File path: pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java
##########
@@ -29,6 +29,7 @@
 import org.apache.pinot.spi.config.BaseJsonConfig;
 import org.apache.pinot.spi.config.table.assignment.InstanceAssignmentConfig;
 import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
+import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;

Review comment:
       not needed?

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/util/TableConfigUtils.java
##########
@@ -179,9 +181,34 @@ private static void validateValidationConfig(TableConfig tableConfig, @Nullable
    * 4. validity of transform function string
    * 5. checks for source fields used in destination columns
    */
-  private static void validateIngestionConfig(@Nullable IngestionConfig ingestionConfig, @Nullable Schema schema) {
+  public static void validateIngestionConfig(String tableNameWithType, @Nullable IngestionConfig ingestionConfig,
+      @Nullable Schema schema) {
     if (ingestionConfig != null) {
 
+      // Batch
+      if (ingestionConfig.getBatchIngestionConfig() != null) {
+        List<Map<String, String>> batchConfigMaps = ingestionConfig.getBatchIngestionConfig().getBatchConfigMaps();
+        Preconditions.checkState(batchConfigMaps.size() > 0, "Could not find batch config map");
+        try {
+            // Validate that BatchConfig can be created
+            batchConfigMaps.forEach(b -> new BatchConfig(tableNameWithType, b));
+        } catch (Exception e) {
+          throw new IllegalStateException("Could not create BatchConfig using the batchConfig map", e);
+        }
+      }
+
+      // Stream
+      if (ingestionConfig.getStreamIngestionConfig() != null) {
+        List<Map<String, String>> streamConfigMaps = ingestionConfig.getStreamIngestionConfig().getStreamConfigMaps();
+        Preconditions.checkState(streamConfigMaps.size() == 1, "Only 1 stream is supported in REALTIME table");

Review comment:
       can we also check and disallow the co-existence with the legacy stream config in index config?

##########
File path: pinot-spi/src/main/java/org/apache/pinot/spi/config/table/SegmentsValidationAndRetentionConfig.java
##########
@@ -22,14 +22,17 @@
 import java.util.concurrent.TimeUnit;
 
 import org.apache.pinot.spi.config.BaseJsonConfig;
+import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
 import org.apache.pinot.spi.utils.TimeUtils;
 
 
 // TODO: Consider break this config into multiple configs
 public class SegmentsValidationAndRetentionConfig extends BaseJsonConfig {
   private String _retentionTimeUnit;
   private String _retentionTimeValue;
+  @Deprecated

Review comment:
       can you add docs on the replacemet?

##########
File path: pinot-core/src/test/java/org/apache/pinot/core/util/IngestionUtilsTest.java
##########
@@ -22,17 +22,27 @@
 import com.google.common.collect.Sets;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;

Review comment:
       why is this not in the same project as `IngestionConfigUtils` in spi?

##########
File path: pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/BatchIngestionConfig.java
##########
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.config.table.ingestion;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonPropertyDescription;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.pinot.spi.config.BaseJsonConfig;
+
+
+/**
+ * Contains all configs related to the batch sources for ingestion.
+ */
+public class BatchIngestionConfig extends BaseJsonConfig {
+
+  @JsonPropertyDescription("Configs for all the batch sources to ingest from")
+  private final List<Map<String, String>> _batchConfigMaps;
+
+  @JsonPropertyDescription("Push type APPEND or REFRESH")
+  private final String _segmentPushType;
+
+  @JsonPropertyDescription("Push frequency HOURLY or DAILY")
+  private final String _segmentPushFrequency;
+
+  @JsonCreator
+  public BatchIngestionConfig(@JsonProperty("batchConfigMaps") List<Map<String, String>> batchConfigMaps,
+      @JsonProperty("segmentPushType") String segmentPushType,
+      @JsonProperty("segmentPushFrequency") String segmentPushFrequency) {
+    _batchConfigMaps = batchConfigMaps;
+    _segmentPushType = segmentPushType;
+    _segmentPushFrequency = segmentPushFrequency;
+  }
+
+  public List<Map<String, String>> getBatchConfigMaps() {
+    return _batchConfigMaps;
+  }
+
+  public String getSegmentPushType() {

Review comment:
       shall we have `@JsonProperty`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org