You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xi...@apache.org on 2020/11/03 01:10:26 UTC

[incubator-pinot] branch master updated: Adding bootstrap table command and move quickstart to use it (#6220)

This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new d19d604  Adding bootstrap table command and move quickstart to use it (#6220)
d19d604 is described below

commit d19d604a4d007e06def26b41f50706ad485ea314
Author: Xiang Fu <fx...@gmail.com>
AuthorDate: Mon Nov 2 17:07:55 2020 -0800

    Adding bootstrap table command and move quickstart to use it (#6220)
---
 .../org/apache/pinot/tools/BootstrapTableTool.java | 132 +++++++++++++++++++++
 .../apache/pinot/tools/GitHubEventsQuickstart.java |   6 +-
 .../org/apache/pinot/tools/HybridQuickstart.java   |  44 +++----
 .../java/org/apache/pinot/tools/Quickstart.java    |  24 ++--
 .../apache/pinot/tools/QuickstartTableRequest.java |  79 ++++++------
 .../org/apache/pinot/tools/RealtimeQuickStart.java |  16 +--
 .../org/apache/pinot/tools/UpsertQuickStart.java   |  15 ++-
 .../pinot/tools/admin/PinotAdministrator.java      |   2 +
 .../tools/admin/command/BootstrapTableCommand.java | 113 ++++++++++++++++++
 .../tools/admin/command/QuickstartRunner.java      |  19 ++-
 10 files changed, 348 insertions(+), 102 deletions(-)

diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/BootstrapTableTool.java b/pinot-tools/src/main/java/org/apache/pinot/tools/BootstrapTableTool.java
new file mode 100644
index 0000000..e795390
--- /dev/null
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/BootstrapTableTool.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.tools;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.Reader;
+import java.net.URL;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.spi.ingestion.batch.IngestionJobLauncher;
+import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec;
+import org.apache.pinot.tools.admin.command.AddTableCommand;
+import org.apache.pinot.tools.utils.JarUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.yaml.snakeyaml.Yaml;
+
+
+public class BootstrapTableTool {
+  private static final Logger LOGGER = LoggerFactory.getLogger(BootstrapTableTool.class);
+  private final String _controllerHost;
+  private final int _controllerPort;
+  private final String _tableDir;
+
+  public BootstrapTableTool(String controllerHost, int controllerPort, String tableDir) {
+    _controllerHost = controllerHost;
+    _controllerPort = controllerPort;
+    _tableDir = tableDir;
+  }
+
+  public boolean execute()
+      throws Exception {
+    File setupTableTmpDir = new File(FileUtils.getTempDirectory(), String.valueOf(System.currentTimeMillis()));
+
+    File tableDir = new File(_tableDir);
+    String tableName = tableDir.getName();
+    File schemaFile = new File(tableDir, String.format("%s_schema.json", tableName));
+    if (!schemaFile.exists()) {
+      throw new RuntimeException(
+          "Unable to find schema file for table - " + tableName + ", at " + schemaFile.getAbsolutePath());
+    }
+    boolean tableCreationResult = false;
+    File offlineTableConfigFile = new File(tableDir, String.format("%s_offline_table_config.json", tableName));
+    if (offlineTableConfigFile.exists()) {
+      File ingestionJobSpecFile = new File(tableDir, "ingestionJobSpec.yaml");
+      tableCreationResult =
+          bootstrapOfflineTable(setupTableTmpDir, tableName, schemaFile, offlineTableConfigFile, ingestionJobSpecFile);
+    }
+    File realtimeTableConfigFile = new File(tableDir, String.format("%s_realtime_table_config.json", tableName));
+    if (realtimeTableConfigFile.exists()) {
+      tableCreationResult = bootstrapRealtimeTable(tableName, schemaFile, realtimeTableConfigFile);
+    }
+    if (!tableCreationResult) {
+      throw new RuntimeException(String
+          .format("Unable to find config files for table - %s, at location [%s] or [%s].", tableName,
+              offlineTableConfigFile.getAbsolutePath(), realtimeTableConfigFile.getAbsolutePath()));
+    }
+    return true;
+  }
+
+  private boolean bootstrapRealtimeTable(String tableName, File schemaFile, File realtimeTableConfigFile)
+      throws Exception {
+    LOGGER.info("Adding realtime table {}", tableName);
+    if (!createTable(schemaFile, realtimeTableConfigFile)) {
+      throw new RuntimeException(String
+          .format("Unable to create realtime table - %s from schema file [%s] and table conf file [%s].", tableName,
+              schemaFile, realtimeTableConfigFile));
+    }
+    return true;
+  }
+
+  private boolean createTable(File schemaFile, File tableConfigFile)
+      throws Exception {
+    return new AddTableCommand().setSchemaFile(schemaFile.getAbsolutePath())
+        .setTableConfigFile(tableConfigFile.getAbsolutePath()).setControllerHost(_controllerHost)
+        .setControllerPort(String.valueOf(_controllerPort)).setExecute(true).execute();
+  }
+  private boolean bootstrapOfflineTable(File setupTableTmpDir, String tableName, File schemaFile,
+      File offlineTableConfigFile, File ingestionJobSpecFile)
+      throws Exception {
+    LOGGER.info("Adding offline table: {}", tableName);
+    boolean tableCreationResult = createTable(schemaFile, offlineTableConfigFile);
+
+    if (!tableCreationResult) {
+      throw new RuntimeException(String
+          .format("Unable to create offline table - %s from schema file [%s] and table conf file [%s].", tableName,
+              schemaFile, offlineTableConfigFile));
+    }
+
+    if (ingestionJobSpecFile.exists()) {
+      LOGGER.info("Launch data ingestion job to build index segment for table {} and push to controller [{}:{}]",
+          tableName, _controllerHost, _controllerPort);
+      try (Reader reader = new BufferedReader(new FileReader(ingestionJobSpecFile.getAbsolutePath()))) {
+        SegmentGenerationJobSpec spec = new Yaml().loadAs(reader, SegmentGenerationJobSpec.class);
+        String inputDirURI = spec.getInputDirURI();
+        if (!new File(inputDirURI).exists()) {
+          URL resolvedInputDirURI = BootstrapTableTool.class.getClassLoader().getResource(inputDirURI);
+          if (resolvedInputDirURI.getProtocol().equals("jar")) {
+            String[] splits = resolvedInputDirURI.getFile().split("!");
+            String inputDir = new File(setupTableTmpDir, "inputData").toString();
+            JarUtils.copyResourcesToDirectory(splits[0], splits[1].substring(1), inputDir);
+            spec.setInputDirURI(inputDir);
+          } else {
+            spec.setInputDirURI(resolvedInputDirURI.toString());
+          }
+        }
+        IngestionJobLauncher.runIngestionJob(spec);
+      }
+    } else {
+      LOGGER.info("Not found ingestionJobSpec.yaml at location [{}], skipping data ingestion",
+          ingestionJobSpecFile.getAbsolutePath());
+    }
+    return true;
+  }
+}
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/GitHubEventsQuickstart.java b/pinot-tools/src/main/java/org/apache/pinot/tools/GitHubEventsQuickstart.java
index 0096a4b..09d7522 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/GitHubEventsQuickstart.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/GitHubEventsQuickstart.java
@@ -62,7 +62,7 @@ public class GitHubEventsQuickstart {
 
   public void execute(String personalAccessToken)
       throws Exception {
-    final File quickStartDataDir = new File("githubEvents" + System.currentTimeMillis());
+    final File quickStartDataDir = new File(new File("githubEvents-" + System.currentTimeMillis()), "pullRequestMergedEvents");
 
     if (!quickStartDataDir.exists()) {
       Preconditions.checkState(quickStartDataDir.mkdirs());
@@ -82,7 +82,7 @@ public class GitHubEventsQuickstart {
 
     File tempDir = new File(FileUtils.getTempDirectory(), String.valueOf(System.currentTimeMillis()));
     Preconditions.checkState(tempDir.mkdirs());
-    QuickstartTableRequest request = new QuickstartTableRequest("pullRequestMergedEvents", schemaFile, tableConfigFile);
+    QuickstartTableRequest request = new QuickstartTableRequest(quickStartDataDir.getAbsolutePath());
     final QuickstartRunner runner = new QuickstartRunner(Lists.newArrayList(request), 1, 1, 1, tempDir);
 
     printStatus(Color.CYAN, "***** Starting Kafka *****");
@@ -92,7 +92,7 @@ public class GitHubEventsQuickstart {
     runner.startAll();
 
     printStatus(Color.CYAN, "***** Adding pullRequestMergedEvents table *****");
-    runner.addTable();
+    runner.bootstrapTable();
 
     printStatus(Color.CYAN, "***** Starting pullRequestMergedEvents data stream and publishing to Kafka *****");
     final PullRequestMergedEventsStream pullRequestMergedEventsStream =
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/HybridQuickstart.java b/pinot-tools/src/main/java/org/apache/pinot/tools/HybridQuickstart.java
index d682218..4d785ee 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/HybridQuickstart.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/HybridQuickstart.java
@@ -27,7 +27,6 @@ import org.apache.commons.io.FileUtils;
 import org.apache.pinot.common.utils.ZkStarter;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.spi.data.readers.FileFormat;
 import org.apache.pinot.spi.plugin.PluginManager;
 import org.apache.pinot.spi.stream.StreamDataProvider;
 import org.apache.pinot.spi.stream.StreamDataServerStartable;
@@ -57,12 +56,11 @@ public class HybridQuickstart {
     new HybridQuickstart().execute();
   }
 
-  private QuickstartTableRequest prepareOfflineTableRequest(File configDir)
+  private QuickstartTableRequest prepareTableRequest(File baseDir)
       throws IOException {
-
-    _schemaFile = new File(configDir, "airlineStats_schema.json");
-    _ingestionJobSpecFile = new File(configDir, "ingestionJobSpec.yaml");
-    File tableConfigFile = new File(configDir, "airlineStats_offline_table_config.json");
+    _schemaFile = new File(baseDir, "airlineStats_schema.json");
+    _ingestionJobSpecFile = new File(baseDir, "ingestionJobSpec.yaml");
+    File tableConfigFile = new File(baseDir, "airlineStats_offline_table_config.json");
 
     ClassLoader classLoader = Quickstart.class.getClassLoader();
     URL resource = classLoader.getResource("examples/batch/airlineStats/airlineStats_schema.json");
@@ -75,26 +73,18 @@ public class HybridQuickstart {
     Preconditions.checkNotNull(resource);
     FileUtils.copyURLToFile(resource, tableConfigFile);
 
-    return new QuickstartTableRequest("airlineStats", _schemaFile, tableConfigFile, _ingestionJobSpecFile,
-        FileFormat.AVRO);
-  }
-
-  private QuickstartTableRequest prepareRealtimeTableRequest(File configDir)
-      throws IOException {
-
-    _dataFile = new File(configDir, "airlineStats_data.avro");
-    _realtimeTableConfigFile = new File(configDir, "airlineStats_realtime_table_config.json");
-
-    URL resource = Quickstart.class.getClassLoader()
+    _realtimeTableConfigFile = new File(baseDir, "airlineStats_realtime_table_config.json");
+    resource = Quickstart.class.getClassLoader()
         .getResource("examples/stream/airlineStats/airlineStats_realtime_table_config.json");
     Preconditions.checkNotNull(resource);
     FileUtils.copyURLToFile(resource, _realtimeTableConfigFile);
     resource = Quickstart.class.getClassLoader()
         .getResource("examples/stream/airlineStats/sample_data/airlineStats_data.avro");
     Preconditions.checkNotNull(resource);
+    _dataFile = new File(baseDir, "airlineStats_data.avro");
     FileUtils.copyURLToFile(resource, _dataFile);
 
-    return new QuickstartTableRequest("airlineStats", _schemaFile, _realtimeTableConfigFile);
+    return new QuickstartTableRequest(baseDir.getAbsolutePath());
   }
 
   private void startKafka() {
@@ -111,16 +101,13 @@ public class HybridQuickstart {
 
   public void execute()
       throws Exception {
-
     File quickstartTmpDir = new File(FileUtils.getTempDirectory(), String.valueOf(System.currentTimeMillis()));
-    File configDir = new File(quickstartTmpDir, "configs");
-    File dataDir = new File(quickstartTmpDir, "data");
-    Preconditions.checkState(configDir.mkdirs());
+    File baseDir = new File(quickstartTmpDir, "airlineStats");
+    File dataDir = new File(baseDir, "data");
     Preconditions.checkState(dataDir.mkdirs());
-    QuickstartTableRequest offlineRequest = prepareOfflineTableRequest(configDir);
-    QuickstartTableRequest realtimeTableRequest = prepareRealtimeTableRequest(configDir);
+    QuickstartTableRequest bootstrapTableRequest = prepareTableRequest(baseDir);
     final QuickstartRunner runner =
-        new QuickstartRunner(Lists.newArrayList(offlineRequest, realtimeTableRequest), 1, 1, 1, dataDir);
+        new QuickstartRunner(Lists.newArrayList(bootstrapTableRequest), 1, 1, 1, dataDir);
     printStatus(Color.YELLOW, "***** Starting Kafka  *****");
     startKafka();
     printStatus(Color.YELLOW, "***** Starting airline data stream and publishing to Kafka *****");
@@ -142,11 +129,8 @@ public class HybridQuickstart {
         e.printStackTrace();
       }
     }));
-    printStatus(Color.YELLOW, "***** Adding airlineStats offline and realtime table *****");
-    runner.addTable();
-    printStatus(Color.YELLOW,
-        "***** Launch data ingestion job to build index segments for airlineStats and push to controller *****");
-    runner.launchDataIngestionJob();
+    printStatus(Color.YELLOW, "***** Bootstrap airlineStats offline and realtime table *****");
+    runner.bootstrapTable();
 
     printStatus(Color.YELLOW, "***** Pinot Hybrid with hybrid table setup is complete *****");
     printStatus(Color.YELLOW, "***** Sequence of operations *****");
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/Quickstart.java b/pinot-tools/src/main/java/org/apache/pinot/tools/Quickstart.java
index b28481a..69f5941 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/Quickstart.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/Quickstart.java
@@ -24,7 +24,6 @@ import com.google.common.collect.Lists;
 import java.io.File;
 import java.net.URL;
 import org.apache.commons.io.FileUtils;
-import org.apache.pinot.spi.data.readers.FileFormat;
 import org.apache.pinot.spi.plugin.PluginManager;
 import org.apache.pinot.tools.admin.command.QuickstartRunner;
 
@@ -135,15 +134,14 @@ public class Quickstart {
   public void execute()
       throws Exception {
     File quickstartTmpDir = new File(FileUtils.getTempDirectory(), String.valueOf(System.currentTimeMillis()));
-    File configDir = new File(quickstartTmpDir, "configs");
-    File dataDir = new File(quickstartTmpDir, "data");
-    Preconditions.checkState(configDir.mkdirs());
+    File baseDir = new File(quickstartTmpDir, "baseballStats");
+    File dataDir = new File(baseDir, "rawdata");
     Preconditions.checkState(dataDir.mkdirs());
 
-    File schemaFile = new File(configDir, "baseballStats_schema.json");
-    File dataFile = new File(configDir, "baseballStats_data.csv");
-    File tableConfigFile = new File(configDir, "baseballStats_offline_table_config.json");
-    File ingestionJobSpecFile = new File(configDir, "ingestionJobSpec.yaml");
+    File schemaFile = new File(baseDir, "baseballStats_schema.json");
+    File tableConfigFile = new File(baseDir, "baseballStats_offline_table_config.json");
+    File ingestionJobSpecFile = new File(baseDir, "ingestionJobSpec.yaml");
+    File dataFile = new File(dataDir, "baseballStats_data.csv");
 
     ClassLoader classLoader = Quickstart.class.getClassLoader();
     URL resource = classLoader.getResource("examples/batch/baseballStats/baseballStats_schema.json");
@@ -159,8 +157,7 @@ public class Quickstart {
     com.google.common.base.Preconditions.checkNotNull(resource);
     FileUtils.copyURLToFile(resource, tableConfigFile);
 
-    QuickstartTableRequest request =
-        new QuickstartTableRequest("baseballStats", schemaFile, tableConfigFile, ingestionJobSpecFile, FileFormat.CSV);
+    QuickstartTableRequest request = new QuickstartTableRequest(baseDir.getAbsolutePath());
     final QuickstartRunner runner = new QuickstartRunner(Lists.newArrayList(request), 1, 1, 1, dataDir);
 
     printStatus(Color.CYAN, "***** Starting Zookeeper, controller, broker and server *****");
@@ -174,10 +171,9 @@ public class Quickstart {
         e.printStackTrace();
       }
     }));
-    printStatus(Color.CYAN, "***** Adding baseballStats table *****");
-    runner.addTable();
-    printStatus(Color.CYAN, "***** Launch data ingestion job to build index segment for baseballStats and push to controller *****");
-    runner.launchDataIngestionJob();
+    printStatus(Color.CYAN, "***** Bootstrap baseballStats table *****");
+    runner.bootstrapTable();
+
     printStatus(Color.CYAN, "***** Waiting for 5 seconds for the server to fetch the assigned segment *****");
     Thread.sleep(5000);
 
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/QuickstartTableRequest.java b/pinot-tools/src/main/java/org/apache/pinot/tools/QuickstartTableRequest.java
index 117ccf1..fcaa32f 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/QuickstartTableRequest.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/QuickstartTableRequest.java
@@ -20,80 +20,87 @@ package org.apache.pinot.tools;
 
 import java.io.File;
 import org.apache.pinot.spi.config.table.TableType;
-import org.apache.pinot.spi.data.readers.FileFormat;
 
 
 public class QuickstartTableRequest {
 
-  File schemaFile;
-  File tableRequestFile;
-  File ingestionJobFile;
-  TableType tableType;
-  String tableName;
-  FileFormat segmentFileFormat = FileFormat.CSV;
+  private String _tableName;
+  private TableType _tableType;
+  private File _schemaFile;
+  private File _tableRequestFile;
+  private File _ingestionJobFile;
+  private String _bootstrapTableDir;
 
-  public QuickstartTableRequest(String tableName, File schemaFile, File tableRequest, File ingestionJobFile,
-      FileFormat segmentFileFormat) {
-    this.tableName = tableName;
-    this.schemaFile = schemaFile;
-    this.tableRequestFile = tableRequest;
-    tableType = TableType.OFFLINE;
-    this.segmentFileFormat = segmentFileFormat;
-    this.ingestionJobFile = ingestionJobFile;
+  public QuickstartTableRequest(String bootstrapTableDir) {
+    this._bootstrapTableDir = bootstrapTableDir;
   }
 
-  public QuickstartTableRequest(String tableName, File schemaFile, File tableRequest) {
-    this.tableName = tableName;
-    this.schemaFile = schemaFile;
-    this.tableRequestFile = tableRequest;
-    tableType = TableType.REALTIME;
-  }
-
-  public FileFormat getSegmentFileFormat() {
-    return segmentFileFormat;
+  public QuickstartTableRequest(String tableName, File schemaFile, File tableRequest, File ingestionJobFile) {
+    this._tableName = tableName;
+    this._schemaFile = schemaFile;
+    this._tableRequestFile = tableRequest;
+    _tableType = TableType.OFFLINE;
+    this._ingestionJobFile = ingestionJobFile;
   }
 
-  public void setSegmentFileFormat(FileFormat segmentFileFormat) {
-    this.segmentFileFormat = segmentFileFormat;
+  public QuickstartTableRequest(String tableName, File schemaFile, File tableRequest) {
+    this._tableName = tableName;
+    this._schemaFile = schemaFile;
+    this._tableRequestFile = tableRequest;
+    _tableType = TableType.REALTIME;
   }
 
   public File getSchemaFile() {
-    return schemaFile;
+    return _schemaFile;
   }
 
   public void setSchemaFile(File schemaFile) {
-    this.schemaFile = schemaFile;
+    this._schemaFile = schemaFile;
   }
 
   public File getTableRequestFile() {
-    return tableRequestFile;
+    return _tableRequestFile;
   }
 
   public void setTableRequestFile(File tableRequestFile) {
-    this.tableRequestFile = tableRequestFile;
+    this._tableRequestFile = tableRequestFile;
   }
 
   public File getIngestionJobFile() {
-    return ingestionJobFile;
+    return _ingestionJobFile;
   }
 
   public void setIngestionJobFile(File ingestionJobFile) {
-    this.ingestionJobFile = ingestionJobFile;
+    this._ingestionJobFile = ingestionJobFile;
   }
 
   public TableType getTableType() {
-    return tableType;
+    return _tableType;
   }
 
   public void setTableType(TableType tableType) {
-    this.tableType = tableType;
+    this._tableType = tableType;
   }
 
   public String getTableName() {
-    return tableName;
+    return _tableName;
   }
 
   public void setTableName(String tableName) {
-    this.tableName = tableName;
+    this._tableName = tableName;
+  }
+
+  public String getBootstrapTableDir() {
+    return _bootstrapTableDir;
+  }
+
+  public void setBootstrapTableDir(String bootstrapTableDir) {
+    this._bootstrapTableDir = bootstrapTableDir;
+  }
+
+  public String toString() {
+    return "{ tableName = " + _tableName + ", tableType = " + _tableType + ", schemaFile = " + _schemaFile
+        + ", tableRequestFile = " + _tableRequestFile + ", ingestionJobFile = " + _ingestionJobFile
+        + ", bootstrapTableDir = " + _bootstrapTableDir + " }";
   }
 }
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeQuickStart.java b/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeQuickStart.java
index 0af2b7c..2b1f9a0 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeQuickStart.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeQuickStart.java
@@ -48,13 +48,13 @@ public class RealtimeQuickStart {
   public void execute()
       throws Exception {
     File quickstartTmpDir = new File(FileUtils.getTempDirectory(), String.valueOf(System.currentTimeMillis()));
-    File configDir = new File(quickstartTmpDir, "configs");
-    File dataDir = new File(quickstartTmpDir, "data");
-    Preconditions.checkState(configDir.mkdirs());
+
+    File baseDir = new File(quickstartTmpDir, "meetupRsvp");
+    File dataDir = new File(baseDir, "rawdata");
     Preconditions.checkState(dataDir.mkdirs());
 
-    File schemaFile = new File(configDir, "meetupRsvp_schema.json");
-    File tableConfigFile = new File(configDir, "meetupRsvp_realtime_table_config.json");
+    File schemaFile = new File(baseDir, "meetupRsvp_schema.json");
+    File tableConfigFile = new File(baseDir, "meetupRsvp_realtime_table_config.json");
 
     ClassLoader classLoader = Quickstart.class.getClassLoader();
     URL resource = classLoader.getResource("examples/stream/meetupRsvp/meetupRsvp_schema.json");
@@ -64,7 +64,7 @@ public class RealtimeQuickStart {
     com.google.common.base.Preconditions.checkNotNull(resource);
     FileUtils.copyURLToFile(resource, tableConfigFile);
 
-    QuickstartTableRequest request = new QuickstartTableRequest("meetupRsvp", schemaFile, tableConfigFile);
+    QuickstartTableRequest request = new QuickstartTableRequest(baseDir.getAbsolutePath());
     final QuickstartRunner runner = new QuickstartRunner(Lists.newArrayList(request), 1, 1, 1, dataDir);
 
     printStatus(Color.CYAN, "***** Starting Kafka *****");
@@ -93,8 +93,8 @@ public class RealtimeQuickStart {
         e.printStackTrace();
       }
     }));
-    printStatus(Color.CYAN, "***** Adding meetupRSVP table *****");
-    runner.addTable();
+    printStatus(Color.CYAN, "***** Bootstrap meetupRSVP table *****");
+    runner.bootstrapTable();
     printStatus(Color.CYAN, "***** Waiting for 5 seconds for a few events to get populated *****");
     Thread.sleep(5000);
 
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/UpsertQuickStart.java b/pinot-tools/src/main/java/org/apache/pinot/tools/UpsertQuickStart.java
index a457cac..e57174c 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/UpsertQuickStart.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/UpsertQuickStart.java
@@ -48,13 +48,12 @@ public class UpsertQuickStart {
   public void execute()
       throws Exception {
     File quickstartTmpDir = new File(FileUtils.getTempDirectory(), String.valueOf(System.currentTimeMillis()));
-    File configDir = new File(quickstartTmpDir, "configs");
-    File dataDir = new File(quickstartTmpDir, "data");
-    Preconditions.checkState(configDir.mkdirs());
+    File bootstrapTableDir = new File(quickstartTmpDir, "meetupRsvp");
+    File dataDir = new File(bootstrapTableDir, "data");
     Preconditions.checkState(dataDir.mkdirs());
 
-    File schemaFile = new File(configDir, "upsert_meetupRsvp_schema.json");
-    File tableConfigFile = new File(configDir, "upsert_meetupRsvp_realtime_table_config.json");
+    File schemaFile = new File(bootstrapTableDir, "meetupRsvp_schema.json");
+    File tableConfigFile = new File(bootstrapTableDir, "meetupRsvp_realtime_table_config.json");
 
     ClassLoader classLoader = Quickstart.class.getClassLoader();
     URL resource = classLoader.getResource("examples/stream/meetupRsvp/upsert_meetupRsvp_schema.json");
@@ -64,7 +63,7 @@ public class UpsertQuickStart {
     com.google.common.base.Preconditions.checkNotNull(resource);
     FileUtils.copyURLToFile(resource, tableConfigFile);
 
-    QuickstartTableRequest request = new QuickstartTableRequest("meetupRsvp", schemaFile, tableConfigFile);
+    QuickstartTableRequest request = new QuickstartTableRequest(bootstrapTableDir.getAbsolutePath());
     final QuickstartRunner runner = new QuickstartRunner(Lists.newArrayList(request), 1, 1, 1, dataDir);
 
     printStatus(Color.CYAN, "***** Starting Kafka *****");
@@ -94,8 +93,8 @@ public class UpsertQuickStart {
         e.printStackTrace();
       }
     }));
-    printStatus(Color.CYAN, "***** Adding meetupRSVP(upsert) table *****");
-    runner.addTable();
+    printStatus(Color.CYAN, "***** Bootstrap meetupRSVP(upsert) table *****");
+    runner.bootstrapTable();
     printStatus(Color.CYAN, "***** Waiting for 5 seconds for a few events to get populated *****");
     Thread.sleep(5000);
 
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/PinotAdministrator.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/PinotAdministrator.java
index cdd620f..83c90da 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/PinotAdministrator.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/PinotAdministrator.java
@@ -41,6 +41,7 @@ import org.apache.pinot.tools.admin.command.QuickStartCommand;
 import org.apache.pinot.tools.admin.command.RealtimeProvisioningHelperCommand;
 import org.apache.pinot.tools.admin.command.RebalanceTableCommand;
 import org.apache.pinot.tools.admin.command.SegmentProcessorFrameworkCommand;
+import org.apache.pinot.tools.admin.command.BootstrapTableCommand;
 import org.apache.pinot.tools.admin.command.ShowClusterInfoCommand;
 import org.apache.pinot.tools.admin.command.StartBrokerCommand;
 import org.apache.pinot.tools.admin.command.StartControllerCommand;
@@ -127,6 +128,7 @@ public class PinotAdministrator {
       @SubCommand(name = "AnonymizeData", impl = AnonymizeDataCommand.class),
       @SubCommand(name = "GitHubEventsQuickStart", impl = GitHubEventsQuickStartCommand.class),
       @SubCommand(name = "StreamGitHubEvents", impl = StreamGitHubEventsCommand.class),
+      @SubCommand(name = "BootstrapTable", impl = BootstrapTableCommand.class),
       @SubCommand(name = "SegmentProcessorFramework", impl = SegmentProcessorFrameworkCommand.class)
   })
   Command _subCommand;
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/BootstrapTableCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/BootstrapTableCommand.java
new file mode 100644
index 0000000..7d725fc
--- /dev/null
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/BootstrapTableCommand.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.tools.admin.command;
+
+import org.apache.pinot.spi.plugin.PluginManager;
+import org.apache.pinot.tools.Command;
+import org.apache.pinot.tools.BootstrapTableTool;
+import org.kohsuke.args4j.Option;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * The command to bootstrap a Pinot table from a directory with table schema/config/ingestionJobSpec/raw data files.
+ *
+ * Sample usage:
+ * {@code pinot-admin.sh BootstrapTable -dir <path-to-table-configs-directory> }
+ *
+ * The directory structure is based on current example conventions:
+ * For offline table:
+ * ```
+ * <table_name>/
+ * <table_name>/<table_name>_schema.json
+ * <table_name>/<table_name>_offline_table_config.json
+ * <table_name>/ingestionJobSpec.yaml
+ * <table_name>/rawdata/...
+ * ```
+ *
+ * For realtime table:
+ * ```
+ * <table_name>/
+ * <table_name>/<table_name>_schema.json
+ * <table_name>/<table_name>_realtime_table_config.json
+ * ```
+ *
+ * For hybrid table:
+ * ```
+ * <table_name>/
+ * <table_name>/<table_name>_schema.json
+ * <table_name>/<table_name>_offline_table_config.json
+ * <table_name>/<table_name>_realtime_table_config.json
+ * <table_name>/ingestionJobSpec.yaml
+ * <table_name>/rawdata/...
+ * ```
+ */
+public class BootstrapTableCommand extends AbstractBaseAdminCommand implements Command {
+  private static final Logger LOGGER = LoggerFactory.getLogger(BootstrapTableCommand.class.getName());
+
+  @Option(name = "-controllerHost", required = false, metaVar = "<String>", usage = "host name for controller.")
+  private String _controllerHost;
+
+  @Option(name = "-controllerPort", required = false, metaVar = "<int>", usage = "http port for broker.")
+  private final String _controllerPort = DEFAULT_CONTROLLER_PORT;
+
+  @Option(name = "-dir", required = false, aliases = {"-d", "-directory"}, metaVar = "<String>", usage = "The directory contains all the configs and data to bootstrap a table")
+  private String _dir;
+
+  @Option(name = "-help", required = false, help = true, aliases = {"-h", "--h", "--help"}, usage = "Print this message.")
+  private final boolean _help = false;
+
+  @Override
+  public boolean getHelp() {
+    return _help;
+  }
+
+  @Override
+  public String getName() {
+    return "BootstrapTable";
+  }
+
+  public BootstrapTableCommand setDir(String dir) {
+    _dir = dir;
+    return this;
+  }
+
+  @Override
+  public String toString() {
+    return ("BootstrapTable -dir " + _dir);
+  }
+
+  @Override
+  public void cleanup() {
+
+  }
+
+  @Override
+  public String description() {
+    return "Run Pinot Bootstrap Table.";
+  }
+
+  @Override
+  public boolean execute()
+      throws Exception {
+    PluginManager.get().init();
+    return new BootstrapTableTool(_controllerHost, Integer.parseInt(_controllerPort), _dir).execute();
+  }
+}
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/QuickstartRunner.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/QuickstartRunner.java
index fd8aa21..e2e3e38 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/QuickstartRunner.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/QuickstartRunner.java
@@ -24,6 +24,7 @@ import java.io.File;
 import java.io.FileReader;
 import java.io.IOException;
 import java.io.Reader;
+import java.net.InetAddress;
 import java.net.URL;
 import java.util.ArrayList;
 import java.util.List;
@@ -36,6 +37,7 @@ import org.apache.pinot.spi.ingestion.batch.IngestionJobLauncher;
 import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec;
 import org.apache.pinot.spi.utils.JsonUtils;
 import org.apache.pinot.tools.QuickstartTableRequest;
+import org.apache.pinot.tools.BootstrapTableTool;
 import org.apache.pinot.tools.utils.JarUtils;
 import org.yaml.snakeyaml.Yaml;
 
@@ -169,6 +171,17 @@ public class QuickstartRunner {
         .setInstances(number).setRole(TenantRole.BROKER).setExecute(true).execute();
   }
 
+  public void bootstrapTable()
+      throws Exception {
+    for (QuickstartTableRequest request : _tableRequests) {
+      if (!new BootstrapTableTool(InetAddress.getLocalHost().getHostName(), _controllerPorts.get(0), request.getBootstrapTableDir())
+          .execute()) {
+        throw new RuntimeException("Failed to bootstrap table with request - " + request);
+      }
+    }
+  }
+
+  @Deprecated
   public void addTable()
       throws Exception {
     for (QuickstartTableRequest request : _tableRequests) {
@@ -178,6 +191,7 @@ public class QuickstartRunner {
     }
   }
 
+  @Deprecated
   public void launchDataIngestionJob()
       throws Exception {
     for (QuickstartTableRequest request : _tableRequests) {
@@ -205,8 +219,7 @@ public class QuickstartRunner {
   public JsonNode runQuery(String query)
       throws Exception {
     int brokerPort = _brokerPorts.get(RANDOM.nextInt(_brokerPorts.size()));
-    return JsonUtils
-        .stringToJsonNode(new PostQueryCommand().setBrokerPort(String.valueOf(brokerPort)).setQueryType(
-            CommonConstants.Broker.Request.SQL).setQuery(query).run());
+    return JsonUtils.stringToJsonNode(new PostQueryCommand().setBrokerPort(String.valueOf(brokerPort))
+        .setQueryType(CommonConstants.Broker.Request.SQL).setQuery(query).run());
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org