You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nemo.apache.org by jo...@apache.org on 2018/10/17 01:13:24 UTC

[incubator-nemo] 07/14: query from file

This is an automated email from the ASF dual-hosted git repository.

johnyangk pushed a commit to branch tpch-fix
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git

commit a471303f5b3a0ee29c5a674849f21eb20d71baca
Author: John Yang <jo...@apache.org>
AuthorDate: Wed Sep 12 17:33:19 2018 +0900

    query from file
---
 .../org/apache/nemo/examples/beam/tpch/Tpch.java   | 56 ++++++++++++++++++++--
 .../apache/nemo/examples/beam/SQLTpchITCase.java   | 20 ++++++++
 2 files changed, 72 insertions(+), 4 deletions(-)

diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/tpch/Tpch.java b/examples/beam/src/main/java/org/apache/nemo/examples/beam/tpch/Tpch.java
index 42db3da..0b62e8e 100644
--- a/examples/beam/src/main/java/org/apache/nemo/examples/beam/tpch/Tpch.java
+++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/tpch/Tpch.java
@@ -19,7 +19,6 @@ package org.apache.nemo.examples.beam.tpch;
 import com.google.common.collect.ImmutableMap;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.extensions.sql.SqlTransform;
-import org.apache.beam.sdk.extensions.sql.meta.provider.text.TextTable;
 import org.apache.beam.sdk.extensions.sql.meta.provider.text.TextTableProvider;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -35,9 +34,16 @@ import org.apache.nemo.examples.beam.GenericSourceSink;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.io.Serializable;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import static org.apache.beam.sdk.extensions.sql.impl.schema.BeamTableUtils.beamRow2CsvLine;
 
@@ -214,7 +220,7 @@ public final class Tpch {
 
     PCollectionTuple tables = PCollectionTuple.empty(pipeline);
     for (final Map.Entry<String, Schema> tableSchema : hSchemas.entrySet()) {
-      final String filePattern = inputDirectory + tableSchema.getKey() + ".tbl*";
+      final String filePattern = inputDirectory + tableSchema.getKey() + ".tbl";
       final PCollection<Row> table = GenericSourceSink.read(pipeline, filePattern)
         .apply("StringToRow", new TextTableProvider.CsvToRow(tableSchema.getValue(), csvFormat))
         .setCoder(tableSchema.getValue().getRowCoder())
@@ -242,7 +248,7 @@ public final class Tpch {
     idToQuery.put(5, QUERY5);
     idToQuery.put(6, QUERY6);
 
-    LOG.info("{} / {}", queryId, inputDirectory, outputFilePath);
+    LOG.info("{} / {} / {}", queryId, inputDirectory, outputFilePath);
 
     final PipelineOptions options = PipelineOptionsFactory.create().as(NemoPipelineOptions.class);
     options.setRunner(NemoPipelineRunner.class);
@@ -257,7 +263,7 @@ public final class Tpch {
     final PCollectionTuple tables = getHTables(p, csvFormat, inputDirectory);
 
     // Run the TPC-H query
-    final PCollection<Row> result = tables.apply(SqlTransform.query(idToQuery.get(queryId)));
+    final PCollection<Row> result = tables.apply(SqlTransform.query(getQueryString(queryId)));
 
     final PCollection<String> resultToWrite = result.apply(MapElements.into(TypeDescriptors.strings()).via(
       new SerializableFunction<Row, String>() {
@@ -273,4 +279,46 @@ public final class Tpch {
     // Then run
     p.run();
   }
+
+  private static String getQueryString(final int queryNum) {
+    boolean isStarted = false;
+    final List<String> lines = new ArrayList<>();
+
+
+    final String path =
+      "/Users/johnyang/Documents/workspace/hive-testbench/sample-queries-tpch/tpch_query" + queryNum + ".sql";
+
+    try (final Stream<String> stream  = Files.lines(Paths.get(path))) {
+      for (final String line : stream.collect(Collectors.toList())) {
+        if (line.equals("select")) {
+          isStarted = true;
+        }
+
+        if (isStarted) {
+          lines.add(line);
+        }
+      }
+
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+
+    System.out.println(lines);
+    lines.remove(lines.size() - 1);
+
+    final StringBuilder sb = new StringBuilder();
+    lines.forEach(line -> {
+      sb.append(" ");
+      sb.append(line);
+    });
+
+    final String concate = sb.toString();
+    System.out.println(concate);
+    final String cleanOne = concate.replaceAll("\n", " ");
+    System.out.println(cleanOne);
+    final String cleanTwo = cleanOne.replaceAll("\t", " ");
+    System.out.println(cleanTwo);
+
+    return cleanTwo;
+  }
 }
diff --git a/examples/beam/src/test/java/org/apache/nemo/examples/beam/SQLTpchITCase.java b/examples/beam/src/test/java/org/apache/nemo/examples/beam/SQLTpchITCase.java
index 87df9ae..80e9f68 100644
--- a/examples/beam/src/test/java/org/apache/nemo/examples/beam/SQLTpchITCase.java
+++ b/examples/beam/src/test/java/org/apache/nemo/examples/beam/SQLTpchITCase.java
@@ -27,6 +27,15 @@ import org.junit.runner.RunWith;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
+import java.io.BufferedReader;
+import java.io.FileReader;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
 /**
  * Test TPC-H program with JobLauncher.
  */
@@ -58,6 +67,17 @@ public final class SQLTpchITCase {
   }
 
   @Test (timeout = TIMEOUT)
+  public void testXX() throws Exception {
+    final int queryNum = 12;
+    JobLauncher.main(builder
+      .addUserMain(Tpch.class.getCanonicalName())
+      .addUserArgs(String.valueOf(queryNum), "/home/johnyangk/Desktop/tpc-concat-tbls/", outputFilePath)
+      .addJobId(SQLTpchITCase.class.getSimpleName())
+      .addOptimizationPolicy(DefaultPolicyParallelismFive.class.getCanonicalName())
+      .build());
+  }
+
+  @Test (timeout = TIMEOUT)
   public void testThree() throws Exception {
     final int queryNum = 3;
     JobLauncher.main(builder