You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2021/02/11 04:12:44 UTC

[GitHub] [incubator-pinot] mcvsubbu commented on a change in pull request #6419: Compatibility test: streamOp

mcvsubbu commented on a change in pull request #6419:
URL: https://github.com/apache/incubator-pinot/pull/6419#discussion_r574240057



##########
File path: compatibility-verifier/compCheck.sh
##########
@@ -72,29 +72,31 @@ function checkoutAndBuild() {
   popd || exit 1
 }
 
-# Given a component and directory, start that version of the specific component 
+# Given a component and directory, start that version of the specific component
 function startService() {
   serviceName=$1
   dirName=$2
   # Upon start, save the pid of the process for a component into a file in /tmp/{component}.pid, which is then used to stop it
   pushd "$dirName"/pinot-tools/target/pinot-tools-pkg/bin  || exit 1
   if [ "$serviceName" = "zookeeper" ]; then
     sh -c 'echo $$ > $0/zookeeper.pid; exec ./pinot-admin.sh StartZookeeper' "${dirName}" &
-  elif [ "$serviceName" = "controller" ]; then 
+  elif [ "$serviceName" = "controller" ]; then
     sh -c 'echo $$ > $0/controller.pid; exec ./pinot-admin.sh StartController' "${dirName}" &
   elif [ "$serviceName" = "broker" ]; then
     sh -c 'echo $$ > $0/broker.pid; exec ./pinot-admin.sh StartBroker' "${dirName}" &
   elif [ "$serviceName" = "server" ]; then
     sh -c 'echo $$ > $0/server.pid; exec ./pinot-admin.sh StartServer' "${dirName}" &
-  fi 
+  elif [ "$serviceName" = "kafka" ]; then

Review comment:
       I have modified this file, so you need to merge.

##########
File path: pinot-integration-tests/src/test/java/org/apache/pinot/compat/tests/StreamOp.java
##########
@@ -71,18 +118,170 @@ public void setInputDataFileName(String inputDataFileName) {
     _inputDataFileName = inputDataFileName;
   }
 
+  public String getTableConfigFileName() {
+    return _tableConfigFileName;
+  }
+
+  public void setTableConfigFileName(String tableConfigFileName) {
+    _tableConfigFileName = tableConfigFileName;
+  }
+
+  public String getRecordReaderConfigFileName() {
+    return _recordReaderConfigFileName;
+  }
+
+  public void setRecordReaderConfigFileName(String recordReaderConfigFileName) {
+    _recordReaderConfigFileName = recordReaderConfigFileName;
+  }
+
   @Override
   boolean runOp() {
-    System.out.println("Produce rows into stream " + _streamConfigFileName + " and verify rows in tables "
-        + _tableConfigFileNames);
+    switch(_op) {
+      case CREATE:
+        return createKafkaTopic();
+      case PRODUCE:
+        return produceData();
+    }
     return true;
   }
 
-  public List<String> getTableConfigFileNames() {
-    return _tableConfigFileNames;
+  private boolean createKafkaTopic() {
+    try {
+      Properties streamConfigMap = JsonUtils.fileToObject(new File(_streamConfigFileName), Properties.class);
+      String topicName = streamConfigMap.getProperty("stream.kafka.topic.name");

Review comment:
       We can use `topicName` and `numPartitions` as the keys. We don't need to namespace it in the yaml file

##########
File path: pinot-integration-tests/src/test/java/org/apache/pinot/compat/tests/StreamOp.java
##########
@@ -71,18 +118,170 @@ public void setInputDataFileName(String inputDataFileName) {
     _inputDataFileName = inputDataFileName;
   }
 
+  public String getTableConfigFileName() {
+    return _tableConfigFileName;
+  }
+
+  public void setTableConfigFileName(String tableConfigFileName) {
+    _tableConfigFileName = tableConfigFileName;
+  }
+
+  public String getRecordReaderConfigFileName() {
+    return _recordReaderConfigFileName;
+  }
+
+  public void setRecordReaderConfigFileName(String recordReaderConfigFileName) {
+    _recordReaderConfigFileName = recordReaderConfigFileName;
+  }
+
   @Override
   boolean runOp() {
-    System.out.println("Produce rows into stream " + _streamConfigFileName + " and verify rows in tables "
-        + _tableConfigFileNames);
+    switch(_op) {
+      case CREATE:
+        return createKafkaTopic();
+      case PRODUCE:
+        return produceData();
+    }
     return true;
   }
 
-  public List<String> getTableConfigFileNames() {
-    return _tableConfigFileNames;
+  private boolean createKafkaTopic() {
+    try {
+      Properties streamConfigMap = JsonUtils.fileToObject(new File(_streamConfigFileName), Properties.class);
+      String topicName = streamConfigMap.getProperty("stream.kafka.topic.name");
+      int partitions = Integer.parseInt(streamConfigMap.getProperty("stream.kafka.numPartitions"));
+
+      StreamDataServerStartable kafkaStarter;
+      Properties kafkaProperties = KafkaStarterUtils.getDefaultKafkaConfiguration();
+      kafkaProperties.put(ZOOKEEPER_CONNECT, streamConfigMap.getProperty("stream.kafka.zk.broker.url"));

Review comment:
       Do we need this in streamconfigmap? we start zk in 2181, so we can just use "localhost:2181/kafka" (since you start the kafka under the namespace.

##########
File path: pinot-integration-tests/src/test/java/org/apache/pinot/compat/tests/StreamOp.java
##########
@@ -71,18 +118,170 @@ public void setInputDataFileName(String inputDataFileName) {
     _inputDataFileName = inputDataFileName;
   }
 
+  public String getTableConfigFileName() {
+    return _tableConfigFileName;
+  }
+
+  public void setTableConfigFileName(String tableConfigFileName) {
+    _tableConfigFileName = tableConfigFileName;
+  }
+
+  public String getRecordReaderConfigFileName() {
+    return _recordReaderConfigFileName;
+  }
+
+  public void setRecordReaderConfigFileName(String recordReaderConfigFileName) {
+    _recordReaderConfigFileName = recordReaderConfigFileName;
+  }
+
   @Override
   boolean runOp() {
-    System.out.println("Produce rows into stream " + _streamConfigFileName + " and verify rows in tables "
-        + _tableConfigFileNames);
+    switch(_op) {
+      case CREATE:
+        return createKafkaTopic();
+      case PRODUCE:
+        return produceData();
+    }
     return true;
   }
 
-  public List<String> getTableConfigFileNames() {
-    return _tableConfigFileNames;
+  private boolean createKafkaTopic() {
+    try {
+      Properties streamConfigMap = JsonUtils.fileToObject(new File(_streamConfigFileName), Properties.class);
+      String topicName = streamConfigMap.getProperty("stream.kafka.topic.name");
+      int partitions = Integer.parseInt(streamConfigMap.getProperty("stream.kafka.numPartitions"));
+
+      StreamDataServerStartable kafkaStarter;
+      Properties kafkaProperties = KafkaStarterUtils.getDefaultKafkaConfiguration();
+      kafkaProperties.put(ZOOKEEPER_CONNECT, streamConfigMap.getProperty("stream.kafka.zk.broker.url"));
+
+      kafkaStarter  = StreamDataProvider
+          .getServerDataStartable(KafkaStarterUtils.KAFKA_SERVER_STARTABLE_CLASS_NAME, kafkaProperties);
+      kafkaStarter.createTopic(topicName, KafkaStarterUtils.getTopicCreationProps(partitions));
+    } catch (Exception e) {
+      LOGGER.warn("Failed to create Kafka topic with stream config file: {}", _streamConfigFileName, e);
+      return false;
+    }
+    return true;
+  }
+
+
+  private boolean produceData() {
+    try {
+      // get kafka topic
+      Properties streamConfigMap = JsonUtils.fileToObject(new File(_streamConfigFileName), Properties.class);
+      String topicName = streamConfigMap.getProperty("stream.kafka.topic.name");
+      String partitionColumn = streamConfigMap.getProperty("stream.kafka.partitionColumn");

Review comment:
       `partitionColumn` should be good enough.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org