You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nemo.apache.org by jo...@apache.org on 2018/10/17 01:13:26 UTC
[incubator-nemo] 09/14: load only the used tables
This is an automated email from the ASF dual-hosted git repository.
johnyangk pushed a commit to branch tpch-fix
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git
commit 5ff7062aee00d5a8ec74935e2d4330da71398127
Author: John Yang <jo...@apache.org>
AuthorDate: Thu Sep 13 10:25:36 2018 +0900
load only the used tables
---
.../org/apache/nemo/examples/beam/tpch/Tpch.java | 159 ++++-----------------
1 file changed, 28 insertions(+), 131 deletions(-)
diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/tpch/Tpch.java b/examples/beam/src/main/java/org/apache/nemo/examples/beam/tpch/Tpch.java
index 16a9297..bf1393c 100644
--- a/examples/beam/src/main/java/org/apache/nemo/examples/beam/tpch/Tpch.java
+++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/tpch/Tpch.java
@@ -38,10 +38,7 @@ import java.io.IOException;
import java.io.Serializable;
import java.nio.file.Files;
import java.nio.file.Paths;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -54,115 +51,6 @@ import static org.apache.beam.sdk.extensions.sql.impl.schema.BeamTableUtils.beam
public final class Tpch {
private static final Logger LOG = LoggerFactory.getLogger(Tpch.class.getName());
- public static final String QUERY1 =
- "select\n"
- + "\tl_returnflag,\n"
- + "\tl_linestatus,\n"
- + "\tsum(l_quantity) as sum_qty,\n"
- + "\tsum(l_extendedprice) as sum_base_price,\n"
- + "\tsum(l_extendedprice * (1 - l_discount)) as sum_disc_price,\n"
- + "\tsum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge,\n"
- + "\tavg(l_quantity) as avg_qty,\n"
- + "\tavg(l_extendedprice) as avg_price,\n"
- + "\tavg(l_discount) as avg_disc,\n"
- + "\tcount(*) as count_order\n"
- + "from\n"
- + "\tlineitem\n"
- + "where\n"
- + "\tl_shipdate <= date '1998-12-01' - interval '90' day (3)\n"
- + "group by\n"
- + "\tl_returnflag,\n"
- + "\tl_linestatus\n"
- + "order by\n"
- + "\tl_returnflag,\n"
- + "\tl_linestatus limit 10";
-
- public static final String QUERY3 =
- "select\n"
- + "\tl_orderkey,\n"
- + "\tsum(l_extendedprice * (1 - l_discount)) as revenue,\n"
- + "\to_orderdate,\n"
- + "\to_shippriority\n"
- + "from\n"
- + "\tcustomer,\n"
- + "\torders,\n"
- + "\tlineitem\n"
- + "where\n"
- + "\tc_mktsegment = 'BUILDING'\n"
- + "\tand c_custkey = o_custkey\n"
- + "\tand l_orderkey = o_orderkey\n"
- + "\tand o_orderdate < date '1995-03-15'\n"
- + "\tand l_shipdate > date '1995-03-15'\n"
- + "group by\n"
- + "\tl_orderkey,\n"
- + "\to_orderdate,\n"
- + "\to_shippriority\n"
- + "order by\n"
- + "\trevenue desc,\n"
- + "\to_orderdate\n"
- + "limit 10";
-
- public static final String QUERY4 =
- "select\n"
- + "\to_orderpriority,\n"
- + "\tcount(*) as order_count\n"
- + "from\n"
- + "\torders\n"
- + "where\n"
- + "\to_orderdate >= date '1993-07-01'\n"
- + "\tand o_orderdate < date '1993-07-01' + interval '3' month\n"
- + "\tand exists (\n"
- + "\t\tselect\n"
- + "\t\t\t*\n"
- + "\t\tfrom\n"
- + "\t\t\tlineitem\n"
- + "\t\twhere\n"
- + "\t\t\tl_orderkey = o_orderkey\n"
- + "\t\t\tand l_commitdate < l_receiptdate\n"
- + "\t)\n"
- + "group by\n"
- + "\to_orderpriority\n"
- + "order by\n"
- + "\to_orderpriority limit 10";
-
- public static final String QUERY5 =
- "select\n"
- + "\tn_name,\n"
- + "\tsum(l_extendedprice * (1 - l_discount)) as revenue\n"
- + "from\n"
- + "\tcustomer,\n"
- + "\torders,\n"
- + "\tlineitem,\n"
- + "\tsupplier,\n"
- + "\tnation,\n"
- + "\tregion\n"
- + "where\n"
- + "\tc_custkey = o_custkey\n"
- + "\tand l_orderkey = o_orderkey\n"
- + "\tand l_suppkey = s_suppkey\n"
- + "\tand c_nationkey = s_nationkey\n"
- + "\tand s_nationkey = n_nationkey\n"
- + "\tand n_regionkey = r_regionkey\n"
- + "\tand r_name = 'ASIA'\n"
- + "\tand o_orderdate >= date '1994-01-01'\n"
- + "\tand o_orderdate < date '1994-01-01' + interval '1' year\n"
- + "group by\n"
- + "\tn_name\n"
- + "order by\n"
- + "\trevenue desc limit 10";
-
- public static final String QUERY6 =
- "select\n"
- + "\tsum(l_extendedprice * l_discount) as revenue\n"
- + "from\n"
- + "\tlineitem\n"
- + "where\n"
- + "\tl_shipdate >= date '1994-01-01'\n"
- + "\tand l_shipdate < date '1994-01-01' + interval '1' year\n"
- + "\tand l_discount between .06 - 0.01 and .06 + 0.01\n"
- + "\tand l_quantity < 24 limit 10";
-
-
/**
* Private Constructor.
*/
@@ -194,7 +82,8 @@ public final class Tpch {
private static PCollectionTuple getHTables(final Pipeline pipeline,
final CSVFormat csvFormat,
- final String inputDirectory) {
+ final String inputDirectory,
+ final String query) {
final ImmutableMap<String, Schema> hSchemas = ImmutableMap.<String, Schema>builder()
.put("lineitem", Schemas.LINEITEM_SCHEMA)
.put("customer", Schemas.CUSTOMER_SCHEMA)
@@ -220,14 +109,28 @@ public final class Tpch {
PCollectionTuple tables = PCollectionTuple.empty(pipeline);
for (final Map.Entry<String, Schema> tableSchema : hSchemas.entrySet()) {
- final String filePattern = inputDirectory + tableSchema.getKey() + ".tbl";
- final PCollection<Row> table = GenericSourceSink.read(pipeline, filePattern)
- .apply("StringToRow", new TextTableProvider.CsvToRow(tableSchema.getValue(), csvFormat))
- .setCoder(tableSchema.getValue().getRowCoder())
- .setName(tableSchema.getKey());
- tables = tables.and(new TupleTag<>(tableSchema.getKey()), table);
-
- LOG.info("FilePattern {} / Tables {}", filePattern, tables);
+ final String tableName = tableSchema.getKey();
+
+ final List<String> tokens = Arrays.asList(query.split(" "));
+ LOG.info("Tokens are {}", tokens);
+
+ if (tokens.contains(tableName)) {
+ LOG.info("HIT: tablename {}", tableName);
+
+ final String filePattern = inputDirectory + tableSchema.getKey() + ".tbl";
+ final PCollection<Row> table = GenericSourceSink.read(pipeline, filePattern)
+ .apply("StringToRow", new TextTableProvider.CsvToRow(tableSchema.getValue(), csvFormat))
+ .setCoder(tableSchema.getValue().getRowCoder())
+ .setName(tableSchema.getKey());
+ tables = tables.and(new TupleTag<>(tableSchema.getKey()), table);
+
+ LOG.info("FilePattern {} / Tables {}", filePattern, tables);
+ }
+
+
+
+
+
}
return tables;
}
@@ -241,13 +144,6 @@ public final class Tpch {
final String inputDirectory = args[1];
final String outputFilePath = args[2];
- final Map<Integer, String> idToQuery = new HashMap<>();
- idToQuery.put(1, QUERY1);
- idToQuery.put(3, QUERY3);
- idToQuery.put(4, QUERY4);
- idToQuery.put(5, QUERY5);
- idToQuery.put(6, QUERY6);
-
LOG.info("{} / {} / {}", queryFilePath, inputDirectory, outputFilePath);
final PipelineOptions options = PipelineOptionsFactory.create().as(NemoPipelineOptions.class);
@@ -255,15 +151,16 @@ public final class Tpch {
options.setJobName("TPC-H");
final Pipeline p = Pipeline.create(options);
+ final String queryString = getQueryString(queryFilePath);
// Create tables
final CSVFormat csvFormat = CSVFormat.MYSQL
.withDelimiter('|')
.withNullString("")
.withTrailingDelimiter();
- final PCollectionTuple tables = getHTables(p, csvFormat, inputDirectory);
+ final PCollectionTuple tables = getHTables(p, csvFormat, inputDirectory, queryString);
// Run the TPC-H query
- final PCollection<Row> result = tables.apply(SqlTransform.query(getQueryString(queryFilePath)));
+ final PCollection<Row> result = tables.apply(SqlTransform.query(queryString));
final PCollection<String> resultToWrite = result.apply(MapElements.into(TypeDescriptors.strings()).via(
new SerializableFunction<Row, String>() {