You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nemo.apache.org by jo...@apache.org on 2018/10/17 01:13:24 UTC
[incubator-nemo] 07/14: query from file
This is an automated email from the ASF dual-hosted git repository.
johnyangk pushed a commit to branch tpch-fix
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git
commit a471303f5b3a0ee29c5a674849f21eb20d71baca
Author: John Yang <jo...@apache.org>
AuthorDate: Wed Sep 12 17:33:19 2018 +0900
query from file
---
.../org/apache/nemo/examples/beam/tpch/Tpch.java | 56 ++++++++++++++++++++--
.../apache/nemo/examples/beam/SQLTpchITCase.java | 20 ++++++++
2 files changed, 72 insertions(+), 4 deletions(-)
diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/tpch/Tpch.java b/examples/beam/src/main/java/org/apache/nemo/examples/beam/tpch/Tpch.java
index 42db3da..0b62e8e 100644
--- a/examples/beam/src/main/java/org/apache/nemo/examples/beam/tpch/Tpch.java
+++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/tpch/Tpch.java
@@ -19,7 +19,6 @@ package org.apache.nemo.examples.beam.tpch;
import com.google.common.collect.ImmutableMap;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.extensions.sql.SqlTransform;
-import org.apache.beam.sdk.extensions.sql.meta.provider.text.TextTable;
import org.apache.beam.sdk.extensions.sql.meta.provider.text.TextTableProvider;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -35,9 +34,16 @@ import org.apache.nemo.examples.beam.GenericSourceSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
import java.io.Serializable;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
import static org.apache.beam.sdk.extensions.sql.impl.schema.BeamTableUtils.beamRow2CsvLine;
@@ -214,7 +220,7 @@ public final class Tpch {
PCollectionTuple tables = PCollectionTuple.empty(pipeline);
for (final Map.Entry<String, Schema> tableSchema : hSchemas.entrySet()) {
- final String filePattern = inputDirectory + tableSchema.getKey() + ".tbl*";
+ final String filePattern = inputDirectory + tableSchema.getKey() + ".tbl";
final PCollection<Row> table = GenericSourceSink.read(pipeline, filePattern)
.apply("StringToRow", new TextTableProvider.CsvToRow(tableSchema.getValue(), csvFormat))
.setCoder(tableSchema.getValue().getRowCoder())
@@ -242,7 +248,7 @@ public final class Tpch {
idToQuery.put(5, QUERY5);
idToQuery.put(6, QUERY6);
- LOG.info("{} / {}", queryId, inputDirectory, outputFilePath);
+ LOG.info("{} / {} / {}", queryId, inputDirectory, outputFilePath);
final PipelineOptions options = PipelineOptionsFactory.create().as(NemoPipelineOptions.class);
options.setRunner(NemoPipelineRunner.class);
@@ -257,7 +263,7 @@ public final class Tpch {
final PCollectionTuple tables = getHTables(p, csvFormat, inputDirectory);
// Run the TPC-H query
- final PCollection<Row> result = tables.apply(SqlTransform.query(idToQuery.get(queryId)));
+ final PCollection<Row> result = tables.apply(SqlTransform.query(getQueryString(queryId)));
final PCollection<String> resultToWrite = result.apply(MapElements.into(TypeDescriptors.strings()).via(
new SerializableFunction<Row, String>() {
@@ -273,4 +279,46 @@ public final class Tpch {
// Then run
p.run();
}
+
+ private static String getQueryString(final int queryNum) {
+ boolean isStarted = false;
+ final List<String> lines = new ArrayList<>();
+
+
+ final String path =
+ "/Users/johnyang/Documents/workspace/hive-testbench/sample-queries-tpch/tpch_query" + queryNum + ".sql";
+
+ try (final Stream<String> stream = Files.lines(Paths.get(path))) {
+ for (final String line : stream.collect(Collectors.toList())) {
+ if (line.equals("select")) {
+ isStarted = true;
+ }
+
+ if (isStarted) {
+ lines.add(line);
+ }
+ }
+
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ System.out.println(lines);
+ lines.remove(lines.size() - 1);
+
+ final StringBuilder sb = new StringBuilder();
+ lines.forEach(line -> {
+ sb.append(" ");
+ sb.append(line);
+ });
+
+ final String concate = sb.toString();
+ System.out.println(concate);
+ final String cleanOne = concate.replaceAll("\n", " ");
+ System.out.println(cleanOne);
+ final String cleanTwo = cleanOne.replaceAll("\t", " ");
+ System.out.println(cleanTwo);
+
+ return cleanTwo;
+ }
}
diff --git a/examples/beam/src/test/java/org/apache/nemo/examples/beam/SQLTpchITCase.java b/examples/beam/src/test/java/org/apache/nemo/examples/beam/SQLTpchITCase.java
index 87df9ae..80e9f68 100644
--- a/examples/beam/src/test/java/org/apache/nemo/examples/beam/SQLTpchITCase.java
+++ b/examples/beam/src/test/java/org/apache/nemo/examples/beam/SQLTpchITCase.java
@@ -27,6 +27,15 @@ import org.junit.runner.RunWith;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
+import java.io.BufferedReader;
+import java.io.FileReader;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
/**
* Test TPC-H program with JobLauncher.
*/
@@ -58,6 +67,17 @@ public final class SQLTpchITCase {
}
@Test (timeout = TIMEOUT)
+ public void testXX() throws Exception {
+ final int queryNum = 12;
+ JobLauncher.main(builder
+ .addUserMain(Tpch.class.getCanonicalName())
+ .addUserArgs(String.valueOf(queryNum), "/home/johnyangk/Desktop/tpc-concat-tbls/", outputFilePath)
+ .addJobId(SQLTpchITCase.class.getSimpleName())
+ .addOptimizationPolicy(DefaultPolicyParallelismFive.class.getCanonicalName())
+ .build());
+ }
+
+ @Test (timeout = TIMEOUT)
public void testThree() throws Exception {
final int queryNum = 3;
JobLauncher.main(builder