You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xi...@apache.org on 2020/11/03 01:10:26 UTC
[incubator-pinot] branch master updated: Adding bootstrap table
command and move quickstart to use it (#6220)
This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new d19d604 Adding bootstrap table command and move quickstart to use it (#6220)
d19d604 is described below
commit d19d604a4d007e06def26b41f50706ad485ea314
Author: Xiang Fu <fx...@gmail.com>
AuthorDate: Mon Nov 2 17:07:55 2020 -0800
Adding bootstrap table command and move quickstart to use it (#6220)
---
.../org/apache/pinot/tools/BootstrapTableTool.java | 132 +++++++++++++++++++++
.../apache/pinot/tools/GitHubEventsQuickstart.java | 6 +-
.../org/apache/pinot/tools/HybridQuickstart.java | 44 +++----
.../java/org/apache/pinot/tools/Quickstart.java | 24 ++--
.../apache/pinot/tools/QuickstartTableRequest.java | 79 ++++++------
.../org/apache/pinot/tools/RealtimeQuickStart.java | 16 +--
.../org/apache/pinot/tools/UpsertQuickStart.java | 15 ++-
.../pinot/tools/admin/PinotAdministrator.java | 2 +
.../tools/admin/command/BootstrapTableCommand.java | 113 ++++++++++++++++++
.../tools/admin/command/QuickstartRunner.java | 19 ++-
10 files changed, 348 insertions(+), 102 deletions(-)
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/BootstrapTableTool.java b/pinot-tools/src/main/java/org/apache/pinot/tools/BootstrapTableTool.java
new file mode 100644
index 0000000..e795390
--- /dev/null
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/BootstrapTableTool.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.tools;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.Reader;
+import java.net.URL;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.spi.ingestion.batch.IngestionJobLauncher;
+import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec;
+import org.apache.pinot.tools.admin.command.AddTableCommand;
+import org.apache.pinot.tools.utils.JarUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.yaml.snakeyaml.Yaml;
+
+
+public class BootstrapTableTool {
+ private static final Logger LOGGER = LoggerFactory.getLogger(BootstrapTableTool.class);
+ private final String _controllerHost;
+ private final int _controllerPort;
+ private final String _tableDir;
+
+ public BootstrapTableTool(String controllerHost, int controllerPort, String tableDir) {
+ _controllerHost = controllerHost;
+ _controllerPort = controllerPort;
+ _tableDir = tableDir;
+ }
+
+ public boolean execute()
+ throws Exception {
+ File setupTableTmpDir = new File(FileUtils.getTempDirectory(), String.valueOf(System.currentTimeMillis()));
+
+ File tableDir = new File(_tableDir);
+ String tableName = tableDir.getName();
+ File schemaFile = new File(tableDir, String.format("%s_schema.json", tableName));
+ if (!schemaFile.exists()) {
+ throw new RuntimeException(
+ "Unable to find schema file for table - " + tableName + ", at " + schemaFile.getAbsolutePath());
+ }
+ boolean tableCreationResult = false;
+ File offlineTableConfigFile = new File(tableDir, String.format("%s_offline_table_config.json", tableName));
+ if (offlineTableConfigFile.exists()) {
+ File ingestionJobSpecFile = new File(tableDir, "ingestionJobSpec.yaml");
+ tableCreationResult =
+ bootstrapOfflineTable(setupTableTmpDir, tableName, schemaFile, offlineTableConfigFile, ingestionJobSpecFile);
+ }
+ File realtimeTableConfigFile = new File(tableDir, String.format("%s_realtime_table_config.json", tableName));
+ if (realtimeTableConfigFile.exists()) {
+ tableCreationResult = bootstrapRealtimeTable(tableName, schemaFile, realtimeTableConfigFile);
+ }
+ if (!tableCreationResult) {
+ throw new RuntimeException(String
+ .format("Unable to find config files for table - %s, at location [%s] or [%s].", tableName,
+ offlineTableConfigFile.getAbsolutePath(), realtimeTableConfigFile.getAbsolutePath()));
+ }
+ return true;
+ }
+
+ private boolean bootstrapRealtimeTable(String tableName, File schemaFile, File realtimeTableConfigFile)
+ throws Exception {
+ LOGGER.info("Adding realtime table {}", tableName);
+ if (!createTable(schemaFile, realtimeTableConfigFile)) {
+ throw new RuntimeException(String
+ .format("Unable to create realtime table - %s from schema file [%s] and table conf file [%s].", tableName,
+ schemaFile, realtimeTableConfigFile));
+ }
+ return true;
+ }
+
+ private boolean createTable(File schemaFile, File tableConfigFile)
+ throws Exception {
+ return new AddTableCommand().setSchemaFile(schemaFile.getAbsolutePath())
+ .setTableConfigFile(tableConfigFile.getAbsolutePath()).setControllerHost(_controllerHost)
+ .setControllerPort(String.valueOf(_controllerPort)).setExecute(true).execute();
+ }
+ private boolean bootstrapOfflineTable(File setupTableTmpDir, String tableName, File schemaFile,
+ File offlineTableConfigFile, File ingestionJobSpecFile)
+ throws Exception {
+ LOGGER.info("Adding offline table: {}", tableName);
+ boolean tableCreationResult = createTable(schemaFile, offlineTableConfigFile);
+
+ if (!tableCreationResult) {
+ throw new RuntimeException(String
+ .format("Unable to create offline table - %s from schema file [%s] and table conf file [%s].", tableName,
+ schemaFile, offlineTableConfigFile));
+ }
+
+ if (ingestionJobSpecFile.exists()) {
+ LOGGER.info("Launch data ingestion job to build index segment for table {} and push to controller [{}:{}]",
+ tableName, _controllerHost, _controllerPort);
+ try (Reader reader = new BufferedReader(new FileReader(ingestionJobSpecFile.getAbsolutePath()))) {
+ SegmentGenerationJobSpec spec = new Yaml().loadAs(reader, SegmentGenerationJobSpec.class);
+ String inputDirURI = spec.getInputDirURI();
+ if (!new File(inputDirURI).exists()) {
+ URL resolvedInputDirURI = BootstrapTableTool.class.getClassLoader().getResource(inputDirURI);
+ if (resolvedInputDirURI.getProtocol().equals("jar")) {
+ String[] splits = resolvedInputDirURI.getFile().split("!");
+ String inputDir = new File(setupTableTmpDir, "inputData").toString();
+ JarUtils.copyResourcesToDirectory(splits[0], splits[1].substring(1), inputDir);
+ spec.setInputDirURI(inputDir);
+ } else {
+ spec.setInputDirURI(resolvedInputDirURI.toString());
+ }
+ }
+ IngestionJobLauncher.runIngestionJob(spec);
+ }
+ } else {
+ LOGGER.info("Not found ingestionJobSpec.yaml at location [{}], skipping data ingestion",
+ ingestionJobSpecFile.getAbsolutePath());
+ }
+ return true;
+ }
+}
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/GitHubEventsQuickstart.java b/pinot-tools/src/main/java/org/apache/pinot/tools/GitHubEventsQuickstart.java
index 0096a4b..09d7522 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/GitHubEventsQuickstart.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/GitHubEventsQuickstart.java
@@ -62,7 +62,7 @@ public class GitHubEventsQuickstart {
public void execute(String personalAccessToken)
throws Exception {
- final File quickStartDataDir = new File("githubEvents" + System.currentTimeMillis());
+ final File quickStartDataDir = new File(new File("githubEvents-" + System.currentTimeMillis()), "pullRequestMergedEvents");
if (!quickStartDataDir.exists()) {
Preconditions.checkState(quickStartDataDir.mkdirs());
@@ -82,7 +82,7 @@ public class GitHubEventsQuickstart {
File tempDir = new File(FileUtils.getTempDirectory(), String.valueOf(System.currentTimeMillis()));
Preconditions.checkState(tempDir.mkdirs());
- QuickstartTableRequest request = new QuickstartTableRequest("pullRequestMergedEvents", schemaFile, tableConfigFile);
+ QuickstartTableRequest request = new QuickstartTableRequest(quickStartDataDir.getAbsolutePath());
final QuickstartRunner runner = new QuickstartRunner(Lists.newArrayList(request), 1, 1, 1, tempDir);
printStatus(Color.CYAN, "***** Starting Kafka *****");
@@ -92,7 +92,7 @@ public class GitHubEventsQuickstart {
runner.startAll();
printStatus(Color.CYAN, "***** Adding pullRequestMergedEvents table *****");
- runner.addTable();
+ runner.bootstrapTable();
printStatus(Color.CYAN, "***** Starting pullRequestMergedEvents data stream and publishing to Kafka *****");
final PullRequestMergedEventsStream pullRequestMergedEventsStream =
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/HybridQuickstart.java b/pinot-tools/src/main/java/org/apache/pinot/tools/HybridQuickstart.java
index d682218..4d785ee 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/HybridQuickstart.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/HybridQuickstart.java
@@ -27,7 +27,6 @@ import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.utils.ZkStarter;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.spi.data.readers.FileFormat;
import org.apache.pinot.spi.plugin.PluginManager;
import org.apache.pinot.spi.stream.StreamDataProvider;
import org.apache.pinot.spi.stream.StreamDataServerStartable;
@@ -57,12 +56,11 @@ public class HybridQuickstart {
new HybridQuickstart().execute();
}
- private QuickstartTableRequest prepareOfflineTableRequest(File configDir)
+ private QuickstartTableRequest prepareTableRequest(File baseDir)
throws IOException {
-
- _schemaFile = new File(configDir, "airlineStats_schema.json");
- _ingestionJobSpecFile = new File(configDir, "ingestionJobSpec.yaml");
- File tableConfigFile = new File(configDir, "airlineStats_offline_table_config.json");
+ _schemaFile = new File(baseDir, "airlineStats_schema.json");
+ _ingestionJobSpecFile = new File(baseDir, "ingestionJobSpec.yaml");
+ File tableConfigFile = new File(baseDir, "airlineStats_offline_table_config.json");
ClassLoader classLoader = Quickstart.class.getClassLoader();
URL resource = classLoader.getResource("examples/batch/airlineStats/airlineStats_schema.json");
@@ -75,26 +73,18 @@ public class HybridQuickstart {
Preconditions.checkNotNull(resource);
FileUtils.copyURLToFile(resource, tableConfigFile);
- return new QuickstartTableRequest("airlineStats", _schemaFile, tableConfigFile, _ingestionJobSpecFile,
- FileFormat.AVRO);
- }
-
- private QuickstartTableRequest prepareRealtimeTableRequest(File configDir)
- throws IOException {
-
- _dataFile = new File(configDir, "airlineStats_data.avro");
- _realtimeTableConfigFile = new File(configDir, "airlineStats_realtime_table_config.json");
-
- URL resource = Quickstart.class.getClassLoader()
+ _realtimeTableConfigFile = new File(baseDir, "airlineStats_realtime_table_config.json");
+ resource = Quickstart.class.getClassLoader()
.getResource("examples/stream/airlineStats/airlineStats_realtime_table_config.json");
Preconditions.checkNotNull(resource);
FileUtils.copyURLToFile(resource, _realtimeTableConfigFile);
resource = Quickstart.class.getClassLoader()
.getResource("examples/stream/airlineStats/sample_data/airlineStats_data.avro");
Preconditions.checkNotNull(resource);
+ _dataFile = new File(baseDir, "airlineStats_data.avro");
FileUtils.copyURLToFile(resource, _dataFile);
- return new QuickstartTableRequest("airlineStats", _schemaFile, _realtimeTableConfigFile);
+ return new QuickstartTableRequest(baseDir.getAbsolutePath());
}
private void startKafka() {
@@ -111,16 +101,13 @@ public class HybridQuickstart {
public void execute()
throws Exception {
-
File quickstartTmpDir = new File(FileUtils.getTempDirectory(), String.valueOf(System.currentTimeMillis()));
- File configDir = new File(quickstartTmpDir, "configs");
- File dataDir = new File(quickstartTmpDir, "data");
- Preconditions.checkState(configDir.mkdirs());
+ File baseDir = new File(quickstartTmpDir, "airlineStats");
+ File dataDir = new File(baseDir, "data");
Preconditions.checkState(dataDir.mkdirs());
- QuickstartTableRequest offlineRequest = prepareOfflineTableRequest(configDir);
- QuickstartTableRequest realtimeTableRequest = prepareRealtimeTableRequest(configDir);
+ QuickstartTableRequest bootstrapTableRequest = prepareTableRequest(baseDir);
final QuickstartRunner runner =
- new QuickstartRunner(Lists.newArrayList(offlineRequest, realtimeTableRequest), 1, 1, 1, dataDir);
+ new QuickstartRunner(Lists.newArrayList(bootstrapTableRequest), 1, 1, 1, dataDir);
printStatus(Color.YELLOW, "***** Starting Kafka *****");
startKafka();
printStatus(Color.YELLOW, "***** Starting airline data stream and publishing to Kafka *****");
@@ -142,11 +129,8 @@ public class HybridQuickstart {
e.printStackTrace();
}
}));
- printStatus(Color.YELLOW, "***** Adding airlineStats offline and realtime table *****");
- runner.addTable();
- printStatus(Color.YELLOW,
- "***** Launch data ingestion job to build index segments for airlineStats and push to controller *****");
- runner.launchDataIngestionJob();
+ printStatus(Color.YELLOW, "***** Bootstrap airlineStats offline and realtime table *****");
+ runner.bootstrapTable();
printStatus(Color.YELLOW, "***** Pinot Hybrid with hybrid table setup is complete *****");
printStatus(Color.YELLOW, "***** Sequence of operations *****");
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/Quickstart.java b/pinot-tools/src/main/java/org/apache/pinot/tools/Quickstart.java
index b28481a..69f5941 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/Quickstart.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/Quickstart.java
@@ -24,7 +24,6 @@ import com.google.common.collect.Lists;
import java.io.File;
import java.net.URL;
import org.apache.commons.io.FileUtils;
-import org.apache.pinot.spi.data.readers.FileFormat;
import org.apache.pinot.spi.plugin.PluginManager;
import org.apache.pinot.tools.admin.command.QuickstartRunner;
@@ -135,15 +134,14 @@ public class Quickstart {
public void execute()
throws Exception {
File quickstartTmpDir = new File(FileUtils.getTempDirectory(), String.valueOf(System.currentTimeMillis()));
- File configDir = new File(quickstartTmpDir, "configs");
- File dataDir = new File(quickstartTmpDir, "data");
- Preconditions.checkState(configDir.mkdirs());
+ File baseDir = new File(quickstartTmpDir, "baseballStats");
+ File dataDir = new File(baseDir, "rawdata");
Preconditions.checkState(dataDir.mkdirs());
- File schemaFile = new File(configDir, "baseballStats_schema.json");
- File dataFile = new File(configDir, "baseballStats_data.csv");
- File tableConfigFile = new File(configDir, "baseballStats_offline_table_config.json");
- File ingestionJobSpecFile = new File(configDir, "ingestionJobSpec.yaml");
+ File schemaFile = new File(baseDir, "baseballStats_schema.json");
+ File tableConfigFile = new File(baseDir, "baseballStats_offline_table_config.json");
+ File ingestionJobSpecFile = new File(baseDir, "ingestionJobSpec.yaml");
+ File dataFile = new File(dataDir, "baseballStats_data.csv");
ClassLoader classLoader = Quickstart.class.getClassLoader();
URL resource = classLoader.getResource("examples/batch/baseballStats/baseballStats_schema.json");
@@ -159,8 +157,7 @@ public class Quickstart {
com.google.common.base.Preconditions.checkNotNull(resource);
FileUtils.copyURLToFile(resource, tableConfigFile);
- QuickstartTableRequest request =
- new QuickstartTableRequest("baseballStats", schemaFile, tableConfigFile, ingestionJobSpecFile, FileFormat.CSV);
+ QuickstartTableRequest request = new QuickstartTableRequest(baseDir.getAbsolutePath());
final QuickstartRunner runner = new QuickstartRunner(Lists.newArrayList(request), 1, 1, 1, dataDir);
printStatus(Color.CYAN, "***** Starting Zookeeper, controller, broker and server *****");
@@ -174,10 +171,9 @@ public class Quickstart {
e.printStackTrace();
}
}));
- printStatus(Color.CYAN, "***** Adding baseballStats table *****");
- runner.addTable();
- printStatus(Color.CYAN, "***** Launch data ingestion job to build index segment for baseballStats and push to controller *****");
- runner.launchDataIngestionJob();
+ printStatus(Color.CYAN, "***** Bootstrap baseballStats table *****");
+ runner.bootstrapTable();
+
printStatus(Color.CYAN, "***** Waiting for 5 seconds for the server to fetch the assigned segment *****");
Thread.sleep(5000);
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/QuickstartTableRequest.java b/pinot-tools/src/main/java/org/apache/pinot/tools/QuickstartTableRequest.java
index 117ccf1..fcaa32f 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/QuickstartTableRequest.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/QuickstartTableRequest.java
@@ -20,80 +20,87 @@ package org.apache.pinot.tools;
import java.io.File;
import org.apache.pinot.spi.config.table.TableType;
-import org.apache.pinot.spi.data.readers.FileFormat;
public class QuickstartTableRequest {
- File schemaFile;
- File tableRequestFile;
- File ingestionJobFile;
- TableType tableType;
- String tableName;
- FileFormat segmentFileFormat = FileFormat.CSV;
+ private String _tableName;
+ private TableType _tableType;
+ private File _schemaFile;
+ private File _tableRequestFile;
+ private File _ingestionJobFile;
+ private String _bootstrapTableDir;
- public QuickstartTableRequest(String tableName, File schemaFile, File tableRequest, File ingestionJobFile,
- FileFormat segmentFileFormat) {
- this.tableName = tableName;
- this.schemaFile = schemaFile;
- this.tableRequestFile = tableRequest;
- tableType = TableType.OFFLINE;
- this.segmentFileFormat = segmentFileFormat;
- this.ingestionJobFile = ingestionJobFile;
+ public QuickstartTableRequest(String bootstrapTableDir) {
+ this._bootstrapTableDir = bootstrapTableDir;
}
- public QuickstartTableRequest(String tableName, File schemaFile, File tableRequest) {
- this.tableName = tableName;
- this.schemaFile = schemaFile;
- this.tableRequestFile = tableRequest;
- tableType = TableType.REALTIME;
- }
-
- public FileFormat getSegmentFileFormat() {
- return segmentFileFormat;
+ public QuickstartTableRequest(String tableName, File schemaFile, File tableRequest, File ingestionJobFile) {
+ this._tableName = tableName;
+ this._schemaFile = schemaFile;
+ this._tableRequestFile = tableRequest;
+ _tableType = TableType.OFFLINE;
+ this._ingestionJobFile = ingestionJobFile;
}
- public void setSegmentFileFormat(FileFormat segmentFileFormat) {
- this.segmentFileFormat = segmentFileFormat;
+ public QuickstartTableRequest(String tableName, File schemaFile, File tableRequest) {
+ this._tableName = tableName;
+ this._schemaFile = schemaFile;
+ this._tableRequestFile = tableRequest;
+ _tableType = TableType.REALTIME;
}
public File getSchemaFile() {
- return schemaFile;
+ return _schemaFile;
}
public void setSchemaFile(File schemaFile) {
- this.schemaFile = schemaFile;
+ this._schemaFile = schemaFile;
}
public File getTableRequestFile() {
- return tableRequestFile;
+ return _tableRequestFile;
}
public void setTableRequestFile(File tableRequestFile) {
- this.tableRequestFile = tableRequestFile;
+ this._tableRequestFile = tableRequestFile;
}
public File getIngestionJobFile() {
- return ingestionJobFile;
+ return _ingestionJobFile;
}
public void setIngestionJobFile(File ingestionJobFile) {
- this.ingestionJobFile = ingestionJobFile;
+ this._ingestionJobFile = ingestionJobFile;
}
public TableType getTableType() {
- return tableType;
+ return _tableType;
}
public void setTableType(TableType tableType) {
- this.tableType = tableType;
+ this._tableType = tableType;
}
public String getTableName() {
- return tableName;
+ return _tableName;
}
public void setTableName(String tableName) {
- this.tableName = tableName;
+ this._tableName = tableName;
+ }
+
+ public String getBootstrapTableDir() {
+ return _bootstrapTableDir;
+ }
+
+ public void setBootstrapTableDir(String bootstrapTableDir) {
+ this._bootstrapTableDir = bootstrapTableDir;
+ }
+
+ public String toString() {
+ return "{ tableName = " + _tableName + ", tableType = " + _tableType + ", schemaFile = " + _schemaFile
+ + ", tableRequestFile = " + _tableRequestFile + ", ingestionJobFile = " + _ingestionJobFile
+ + ", bootstrapTableDir = " + _bootstrapTableDir + " }";
}
}
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeQuickStart.java b/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeQuickStart.java
index 0af2b7c..2b1f9a0 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeQuickStart.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeQuickStart.java
@@ -48,13 +48,13 @@ public class RealtimeQuickStart {
public void execute()
throws Exception {
File quickstartTmpDir = new File(FileUtils.getTempDirectory(), String.valueOf(System.currentTimeMillis()));
- File configDir = new File(quickstartTmpDir, "configs");
- File dataDir = new File(quickstartTmpDir, "data");
- Preconditions.checkState(configDir.mkdirs());
+
+ File baseDir = new File(quickstartTmpDir, "meetupRsvp");
+ File dataDir = new File(baseDir, "rawdata");
Preconditions.checkState(dataDir.mkdirs());
- File schemaFile = new File(configDir, "meetupRsvp_schema.json");
- File tableConfigFile = new File(configDir, "meetupRsvp_realtime_table_config.json");
+ File schemaFile = new File(baseDir, "meetupRsvp_schema.json");
+ File tableConfigFile = new File(baseDir, "meetupRsvp_realtime_table_config.json");
ClassLoader classLoader = Quickstart.class.getClassLoader();
URL resource = classLoader.getResource("examples/stream/meetupRsvp/meetupRsvp_schema.json");
@@ -64,7 +64,7 @@ public class RealtimeQuickStart {
com.google.common.base.Preconditions.checkNotNull(resource);
FileUtils.copyURLToFile(resource, tableConfigFile);
- QuickstartTableRequest request = new QuickstartTableRequest("meetupRsvp", schemaFile, tableConfigFile);
+ QuickstartTableRequest request = new QuickstartTableRequest(baseDir.getAbsolutePath());
final QuickstartRunner runner = new QuickstartRunner(Lists.newArrayList(request), 1, 1, 1, dataDir);
printStatus(Color.CYAN, "***** Starting Kafka *****");
@@ -93,8 +93,8 @@ public class RealtimeQuickStart {
e.printStackTrace();
}
}));
- printStatus(Color.CYAN, "***** Adding meetupRSVP table *****");
- runner.addTable();
+ printStatus(Color.CYAN, "***** Bootstrap meetupRSVP table *****");
+ runner.bootstrapTable();
printStatus(Color.CYAN, "***** Waiting for 5 seconds for a few events to get populated *****");
Thread.sleep(5000);
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/UpsertQuickStart.java b/pinot-tools/src/main/java/org/apache/pinot/tools/UpsertQuickStart.java
index a457cac..e57174c 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/UpsertQuickStart.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/UpsertQuickStart.java
@@ -48,13 +48,12 @@ public class UpsertQuickStart {
public void execute()
throws Exception {
File quickstartTmpDir = new File(FileUtils.getTempDirectory(), String.valueOf(System.currentTimeMillis()));
- File configDir = new File(quickstartTmpDir, "configs");
- File dataDir = new File(quickstartTmpDir, "data");
- Preconditions.checkState(configDir.mkdirs());
+ File bootstrapTableDir = new File(quickstartTmpDir, "meetupRsvp");
+ File dataDir = new File(bootstrapTableDir, "data");
Preconditions.checkState(dataDir.mkdirs());
- File schemaFile = new File(configDir, "upsert_meetupRsvp_schema.json");
- File tableConfigFile = new File(configDir, "upsert_meetupRsvp_realtime_table_config.json");
+ File schemaFile = new File(bootstrapTableDir, "meetupRsvp_schema.json");
+ File tableConfigFile = new File(bootstrapTableDir, "meetupRsvp_realtime_table_config.json");
ClassLoader classLoader = Quickstart.class.getClassLoader();
URL resource = classLoader.getResource("examples/stream/meetupRsvp/upsert_meetupRsvp_schema.json");
@@ -64,7 +63,7 @@ public class UpsertQuickStart {
com.google.common.base.Preconditions.checkNotNull(resource);
FileUtils.copyURLToFile(resource, tableConfigFile);
- QuickstartTableRequest request = new QuickstartTableRequest("meetupRsvp", schemaFile, tableConfigFile);
+ QuickstartTableRequest request = new QuickstartTableRequest(bootstrapTableDir.getAbsolutePath());
final QuickstartRunner runner = new QuickstartRunner(Lists.newArrayList(request), 1, 1, 1, dataDir);
printStatus(Color.CYAN, "***** Starting Kafka *****");
@@ -94,8 +93,8 @@ public class UpsertQuickStart {
e.printStackTrace();
}
}));
- printStatus(Color.CYAN, "***** Adding meetupRSVP(upsert) table *****");
- runner.addTable();
+ printStatus(Color.CYAN, "***** Bootstrap meetupRSVP(upsert) table *****");
+ runner.bootstrapTable();
printStatus(Color.CYAN, "***** Waiting for 5 seconds for a few events to get populated *****");
Thread.sleep(5000);
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/PinotAdministrator.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/PinotAdministrator.java
index cdd620f..83c90da 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/PinotAdministrator.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/PinotAdministrator.java
@@ -41,6 +41,7 @@ import org.apache.pinot.tools.admin.command.QuickStartCommand;
import org.apache.pinot.tools.admin.command.RealtimeProvisioningHelperCommand;
import org.apache.pinot.tools.admin.command.RebalanceTableCommand;
import org.apache.pinot.tools.admin.command.SegmentProcessorFrameworkCommand;
+import org.apache.pinot.tools.admin.command.BootstrapTableCommand;
import org.apache.pinot.tools.admin.command.ShowClusterInfoCommand;
import org.apache.pinot.tools.admin.command.StartBrokerCommand;
import org.apache.pinot.tools.admin.command.StartControllerCommand;
@@ -127,6 +128,7 @@ public class PinotAdministrator {
@SubCommand(name = "AnonymizeData", impl = AnonymizeDataCommand.class),
@SubCommand(name = "GitHubEventsQuickStart", impl = GitHubEventsQuickStartCommand.class),
@SubCommand(name = "StreamGitHubEvents", impl = StreamGitHubEventsCommand.class),
+ @SubCommand(name = "BootstrapTable", impl = BootstrapTableCommand.class),
@SubCommand(name = "SegmentProcessorFramework", impl = SegmentProcessorFrameworkCommand.class)
})
Command _subCommand;
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/BootstrapTableCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/BootstrapTableCommand.java
new file mode 100644
index 0000000..7d725fc
--- /dev/null
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/BootstrapTableCommand.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.tools.admin.command;
+
+import org.apache.pinot.spi.plugin.PluginManager;
+import org.apache.pinot.tools.Command;
+import org.apache.pinot.tools.BootstrapTableTool;
+import org.kohsuke.args4j.Option;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * The command to bootstrap a Pinot table from a directory with table schema/config/ingestionJobSpec/raw data files.
+ *
+ * Sample usage:
+ * {@code pinot-admin.sh BootstrapTable -dir <path-to-table-configs-directory> }
+ *
+ * The directory structure is based on current example conventions:
+ * For offline table:
+ * ```
+ * <table_name>/
+ * <table_name>/<table_name>_schema.json
+ * <table_name>/<table_name>_offline_table_config.json
+ * <table_name>/ingestionJobSpec.yaml
+ * <table_name>/rawdata/...
+ * ```
+ *
+ * For realtime table:
+ * ```
+ * <table_name>/
+ * <table_name>/<table_name>_schema.json
+ * <table_name>/<table_name>_realtime_table_config.json
+ * ```
+ *
+ * For hybrid table:
+ * ```
+ * <table_name>/
+ * <table_name>/<table_name>_schema.json
+ * <table_name>/<table_name>_offline_table_config.json
+ * <table_name>/<table_name>_realtime_table_config.json
+ * <table_name>/ingestionJobSpec.yaml
+ * <table_name>/rawdata/...
+ * ```
+ */
+public class BootstrapTableCommand extends AbstractBaseAdminCommand implements Command {
+ private static final Logger LOGGER = LoggerFactory.getLogger(BootstrapTableCommand.class.getName());
+
+ @Option(name = "-controllerHost", required = false, metaVar = "<String>", usage = "host name for controller.")
+ private String _controllerHost;
+
+ @Option(name = "-controllerPort", required = false, metaVar = "<int>", usage = "http port for broker.")
+ private final String _controllerPort = DEFAULT_CONTROLLER_PORT;
+
+ @Option(name = "-dir", required = false, aliases = {"-d", "-directory"}, metaVar = "<String>", usage = "The directory contains all the configs and data to bootstrap a table")
+ private String _dir;
+
+ @Option(name = "-help", required = false, help = true, aliases = {"-h", "--h", "--help"}, usage = "Print this message.")
+ private final boolean _help = false;
+
+ @Override
+ public boolean getHelp() {
+ return _help;
+ }
+
+ @Override
+ public String getName() {
+ return "BootstrapTable";
+ }
+
+ public BootstrapTableCommand setDir(String dir) {
+ _dir = dir;
+ return this;
+ }
+
+ @Override
+ public String toString() {
+ return ("BootstrapTable -dir " + _dir);
+ }
+
+ @Override
+ public void cleanup() {
+
+ }
+
+ @Override
+ public String description() {
+ return "Run Pinot Bootstrap Table.";
+ }
+
+ @Override
+ public boolean execute()
+ throws Exception {
+ PluginManager.get().init();
+ return new BootstrapTableTool(_controllerHost, Integer.parseInt(_controllerPort), _dir).execute();
+ }
+}
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/QuickstartRunner.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/QuickstartRunner.java
index fd8aa21..e2e3e38 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/QuickstartRunner.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/QuickstartRunner.java
@@ -24,6 +24,7 @@ import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.io.Reader;
+import java.net.InetAddress;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
@@ -36,6 +37,7 @@ import org.apache.pinot.spi.ingestion.batch.IngestionJobLauncher;
import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec;
import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.tools.QuickstartTableRequest;
+import org.apache.pinot.tools.BootstrapTableTool;
import org.apache.pinot.tools.utils.JarUtils;
import org.yaml.snakeyaml.Yaml;
@@ -169,6 +171,17 @@ public class QuickstartRunner {
.setInstances(number).setRole(TenantRole.BROKER).setExecute(true).execute();
}
+ public void bootstrapTable()
+ throws Exception {
+ for (QuickstartTableRequest request : _tableRequests) {
+ if (!new BootstrapTableTool(InetAddress.getLocalHost().getHostName(), _controllerPorts.get(0), request.getBootstrapTableDir())
+ .execute()) {
+ throw new RuntimeException("Failed to bootstrap table with request - " + request);
+ }
+ }
+ }
+
+ @Deprecated
public void addTable()
throws Exception {
for (QuickstartTableRequest request : _tableRequests) {
@@ -178,6 +191,7 @@ public class QuickstartRunner {
}
}
+ @Deprecated
public void launchDataIngestionJob()
throws Exception {
for (QuickstartTableRequest request : _tableRequests) {
@@ -205,8 +219,7 @@ public class QuickstartRunner {
public JsonNode runQuery(String query)
throws Exception {
int brokerPort = _brokerPorts.get(RANDOM.nextInt(_brokerPorts.size()));
- return JsonUtils
- .stringToJsonNode(new PostQueryCommand().setBrokerPort(String.valueOf(brokerPort)).setQueryType(
- CommonConstants.Broker.Request.SQL).setQuery(query).run());
+ return JsonUtils.stringToJsonNode(new PostQueryCommand().setBrokerPort(String.valueOf(brokerPort))
+ .setQueryType(CommonConstants.Broker.Request.SQL).setQuery(query).run());
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org