You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nemo.apache.org by jo...@apache.org on 2018/10/17 01:13:26 UTC

[incubator-nemo] 09/14: load only the used tables

This is an automated email from the ASF dual-hosted git repository.

johnyangk pushed a commit to branch tpch-fix
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git

commit 5ff7062aee00d5a8ec74935e2d4330da71398127
Author: John Yang <jo...@apache.org>
AuthorDate: Thu Sep 13 10:25:36 2018 +0900

    load only the used tables
---
 .../org/apache/nemo/examples/beam/tpch/Tpch.java   | 159 ++++-----------------
 1 file changed, 28 insertions(+), 131 deletions(-)

diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/tpch/Tpch.java b/examples/beam/src/main/java/org/apache/nemo/examples/beam/tpch/Tpch.java
index 16a9297..bf1393c 100644
--- a/examples/beam/src/main/java/org/apache/nemo/examples/beam/tpch/Tpch.java
+++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/tpch/Tpch.java
@@ -38,10 +38,7 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.nio.file.Files;
 import java.nio.file.Paths;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -54,115 +51,6 @@ import static org.apache.beam.sdk.extensions.sql.impl.schema.BeamTableUtils.beam
 public final class Tpch {
   private static final Logger LOG = LoggerFactory.getLogger(Tpch.class.getName());
 
-  public static final String QUERY1 =
-    "select\n"
-      + "\tl_returnflag,\n"
-      + "\tl_linestatus,\n"
-      + "\tsum(l_quantity) as sum_qty,\n"
-      + "\tsum(l_extendedprice) as sum_base_price,\n"
-      + "\tsum(l_extendedprice * (1 - l_discount)) as sum_disc_price,\n"
-      + "\tsum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge,\n"
-      + "\tavg(l_quantity) as avg_qty,\n"
-      + "\tavg(l_extendedprice) as avg_price,\n"
-      + "\tavg(l_discount) as avg_disc,\n"
-      + "\tcount(*) as count_order\n"
-      + "from\n"
-      + "\tlineitem\n"
-      + "where\n"
-      + "\tl_shipdate <= date '1998-12-01' - interval '90' day (3)\n"
-      + "group by\n"
-      + "\tl_returnflag,\n"
-      + "\tl_linestatus\n"
-      + "order by\n"
-      + "\tl_returnflag,\n"
-      + "\tl_linestatus limit 10";
-
-  public static final String QUERY3 =
-    "select\n"
-      + "\tl_orderkey,\n"
-      + "\tsum(l_extendedprice * (1 - l_discount)) as revenue,\n"
-      + "\to_orderdate,\n"
-      + "\to_shippriority\n"
-      + "from\n"
-      + "\tcustomer,\n"
-      + "\torders,\n"
-      + "\tlineitem\n"
-      + "where\n"
-      + "\tc_mktsegment = 'BUILDING'\n"
-      + "\tand c_custkey = o_custkey\n"
-      + "\tand l_orderkey = o_orderkey\n"
-      + "\tand o_orderdate < date '1995-03-15'\n"
-      + "\tand l_shipdate > date '1995-03-15'\n"
-      + "group by\n"
-      + "\tl_orderkey,\n"
-      + "\to_orderdate,\n"
-      + "\to_shippriority\n"
-      + "order by\n"
-      + "\trevenue desc,\n"
-      + "\to_orderdate\n"
-      + "limit 10";
-
-  public static final String QUERY4 =
-    "select\n"
-      + "\to_orderpriority,\n"
-      + "\tcount(*) as order_count\n"
-      + "from\n"
-      + "\torders\n"
-      + "where\n"
-      + "\to_orderdate >= date '1993-07-01'\n"
-      + "\tand o_orderdate < date '1993-07-01' + interval '3' month\n"
-      + "\tand exists (\n"
-      + "\t\tselect\n"
-      + "\t\t\t*\n"
-      + "\t\tfrom\n"
-      + "\t\t\tlineitem\n"
-      + "\t\twhere\n"
-      + "\t\t\tl_orderkey = o_orderkey\n"
-      + "\t\t\tand l_commitdate < l_receiptdate\n"
-      + "\t)\n"
-      + "group by\n"
-      + "\to_orderpriority\n"
-      + "order by\n"
-      + "\to_orderpriority limit 10";
-
-  public static final String QUERY5 =
-    "select\n"
-      + "\tn_name,\n"
-      + "\tsum(l_extendedprice * (1 - l_discount)) as revenue\n"
-      + "from\n"
-      + "\tcustomer,\n"
-      + "\torders,\n"
-      + "\tlineitem,\n"
-      + "\tsupplier,\n"
-      + "\tnation,\n"
-      + "\tregion\n"
-      + "where\n"
-      + "\tc_custkey = o_custkey\n"
-      + "\tand l_orderkey = o_orderkey\n"
-      + "\tand l_suppkey = s_suppkey\n"
-      + "\tand c_nationkey = s_nationkey\n"
-      + "\tand s_nationkey = n_nationkey\n"
-      + "\tand n_regionkey = r_regionkey\n"
-      + "\tand r_name = 'ASIA'\n"
-      + "\tand o_orderdate >= date '1994-01-01'\n"
-      + "\tand o_orderdate < date '1994-01-01' + interval '1' year\n"
-      + "group by\n"
-      + "\tn_name\n"
-      + "order by\n"
-      + "\trevenue desc limit 10";
-
-  public static final String QUERY6 =
-    "select\n"
-      + "\tsum(l_extendedprice * l_discount) as revenue\n"
-      + "from\n"
-      + "\tlineitem\n"
-      + "where\n"
-      + "\tl_shipdate >= date '1994-01-01'\n"
-      + "\tand l_shipdate < date '1994-01-01' + interval '1' year\n"
-      + "\tand l_discount between .06 - 0.01 and .06 + 0.01\n"
-      + "\tand l_quantity < 24 limit 10";
-
-
   /**
    * Private Constructor.
    */
@@ -194,7 +82,8 @@ public final class Tpch {
 
   private static PCollectionTuple getHTables(final Pipeline pipeline,
                                              final CSVFormat csvFormat,
-                                             final String inputDirectory) {
+                                             final String inputDirectory,
+                                             final String query) {
     final ImmutableMap<String, Schema> hSchemas = ImmutableMap.<String, Schema>builder()
       .put("lineitem", Schemas.LINEITEM_SCHEMA)
       .put("customer", Schemas.CUSTOMER_SCHEMA)
@@ -220,14 +109,28 @@ public final class Tpch {
 
     PCollectionTuple tables = PCollectionTuple.empty(pipeline);
     for (final Map.Entry<String, Schema> tableSchema : hSchemas.entrySet()) {
-      final String filePattern = inputDirectory + tableSchema.getKey() + ".tbl";
-      final PCollection<Row> table = GenericSourceSink.read(pipeline, filePattern)
-        .apply("StringToRow", new TextTableProvider.CsvToRow(tableSchema.getValue(), csvFormat))
-        .setCoder(tableSchema.getValue().getRowCoder())
-        .setName(tableSchema.getKey());
-      tables = tables.and(new TupleTag<>(tableSchema.getKey()), table);
-
-      LOG.info("FilePattern {} / Tables {}", filePattern, tables);
+      final String tableName = tableSchema.getKey();
+
+      final List<String> tokens = Arrays.asList(query.split(" "));
+      LOG.info("Tokens are {}", tokens);
+
+      if (tokens.contains(tableName)) {
+        LOG.info("HIT: tablename {}", tableName);
+
+        final String filePattern = inputDirectory + tableSchema.getKey() + ".tbl";
+        final PCollection<Row> table = GenericSourceSink.read(pipeline, filePattern)
+          .apply("StringToRow", new TextTableProvider.CsvToRow(tableSchema.getValue(), csvFormat))
+          .setCoder(tableSchema.getValue().getRowCoder())
+          .setName(tableSchema.getKey());
+        tables = tables.and(new TupleTag<>(tableSchema.getKey()), table);
+
+        LOG.info("FilePattern {} / Tables {}", filePattern, tables);
+      }
+
+
+
+
+
     }
     return tables;
   }
@@ -241,13 +144,6 @@ public final class Tpch {
     final String inputDirectory = args[1];
     final String outputFilePath = args[2];
 
-    final Map<Integer, String> idToQuery = new HashMap<>();
-    idToQuery.put(1, QUERY1);
-    idToQuery.put(3, QUERY3);
-    idToQuery.put(4, QUERY4);
-    idToQuery.put(5, QUERY5);
-    idToQuery.put(6, QUERY6);
-
     LOG.info("{} / {} / {}", queryFilePath, inputDirectory, outputFilePath);
 
     final PipelineOptions options = PipelineOptionsFactory.create().as(NemoPipelineOptions.class);
@@ -255,15 +151,16 @@ public final class Tpch {
     options.setJobName("TPC-H");
     final Pipeline p = Pipeline.create(options);
 
+    final String queryString = getQueryString(queryFilePath);
     // Create tables
     final CSVFormat csvFormat = CSVFormat.MYSQL
       .withDelimiter('|')
       .withNullString("")
       .withTrailingDelimiter();
-    final PCollectionTuple tables = getHTables(p, csvFormat, inputDirectory);
+    final PCollectionTuple tables = getHTables(p, csvFormat, inputDirectory, queryString);
 
     // Run the TPC-H query
-    final PCollection<Row> result = tables.apply(SqlTransform.query(getQueryString(queryFilePath)));
+    final PCollection<Row> result = tables.apply(SqlTransform.query(queryString));
 
     final PCollection<String> resultToWrite = result.apply(MapElements.into(TypeDescriptors.strings()).via(
       new SerializableFunction<Row, String>() {