You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2017/06/14 00:08:50 UTC
[1/2] beam git commit: register table for both BeamSql.simpleQuery
and BeamSql.query
Repository: beam
Updated Branches:
refs/heads/DSL_SQL d52df7471 -> 1e080e2ba
register table for both BeamSql.simpleQuery and BeamSql.query
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ce59eec7
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ce59eec7
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ce59eec7
Branch: refs/heads/DSL_SQL
Commit: ce59eec7f88dfcbbdb16a0db420e0ce541b47468
Parents: d52df74
Author: mingmxu <mi...@ebay.com>
Authored: Sat Jun 10 13:46:17 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Tue Jun 13 17:06:27 2017 -0700
----------------------------------------------------------------------
.../java/org/apache/beam/dsls/sql/BeamSql.java | 12 +++++++---
.../beam/dsls/sql/example/BeamSqlExample.java | 24 ++++++++++++++++----
2 files changed, 29 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/ce59eec7/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java
index 809fed3..ae281ac 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java
@@ -110,6 +110,15 @@ public class BeamSql {
@Override
public PCollection<BeamSqlRow> expand(PCollectionTuple input) {
+ //register tables
+ for (TupleTag<?> sourceTag : input.getAll().keySet()) {
+ PCollection<BeamSqlRow> sourceStream = (PCollection<BeamSqlRow>) input.get(sourceTag);
+ BeamSqlRowCoder sourceCoder = (BeamSqlRowCoder) sourceStream.getCoder();
+
+ BeamSqlEnv.registerTable(sourceTag.getId(),
+ new BeamPCollectionTable(sourceStream, sourceCoder.getTableSchema().toRelDataType()));
+ }
+
BeamRelNode beamRelNode = null;
try {
beamRelNode = BeamSqlEnv.planner.convertToBeamRel(sqlQuery);
@@ -149,13 +158,10 @@ public class BeamSql {
} catch (SqlParseException e) {
throw new IllegalStateException(e);
}
- BeamSqlRowCoder inputCoder = (BeamSqlRowCoder) input.getCoder();
if (sqlNode instanceof SqlSelect) {
SqlSelect select = (SqlSelect) sqlNode;
String tableName = select.getFrom().toString();
- BeamSqlEnv.registerTable(tableName,
- new BeamPCollectionTable(input, inputCoder.getTableSchema().toRelDataType()));
return PCollectionTuple.of(new TupleTag<BeamSqlRow>(tableName), input)
.apply(BeamSql.query(sqlQuery));
} else {
http://git-wip-us.apache.org/repos/asf/beam/blob/ce59eec7/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java
index 4d7328e..36e1aa9 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java
@@ -29,6 +29,8 @@ import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.TupleTag;
import org.apache.calcite.sql.type.SqlTypeName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -58,16 +60,30 @@ public class BeamSqlExample {
PCollection<BeamSqlRow> inputTable = PBegin.in(p).apply(Create.of(row)
.withCoder(new BeamSqlRowCoder(type)));
- //run a simple SQL query over input PCollection;
- String sql = "select c2, c3 from TABLE_A where c1=1";
- PCollection<BeamSqlRow> outputStream = inputTable.apply(BeamSql.simpleQuery(sql));
+ //Case 1. run a simple SQL query over input PCollection with BeamSql.simpleQuery;
+ PCollection<BeamSqlRow> outputStream = inputTable.apply(
+ BeamSql.simpleQuery("select c2, c3 from TABLE_A where c1=1"));
//log out the output record;
outputStream.apply("log_result",
MapElements.<BeamSqlRow, Void>via(new SimpleFunction<BeamSqlRow, Void>() {
+ public Void apply(BeamSqlRow input) {
+ System.out.println("TABLE_A: " + input);
+ return null;
+ }
+ }));
+
+ //Case 2. run the query with BeamSql.query
+ PCollection<BeamSqlRow> outputStream2 =
+ PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_B"), inputTable)
+ .apply(BeamSql.query("select c2, c3 from TABLE_B where c1=1"));
+
+ //log out the output record;
+ outputStream2.apply("log_result",
+ MapElements.<BeamSqlRow, Void>via(new SimpleFunction<BeamSqlRow, Void>() {
@Override
public Void apply(BeamSqlRow input) {
- LOG.info(input.valueInString());
+ System.out.println("TABLE_B: " + input);
return null;
}
}));
[2/2] beam git commit: [BEAM-2436] table is not regeisted in
BeamSql.query
Posted by lc...@apache.org.
[BEAM-2436] table is not regeisted in BeamSql.query
This closes #3342
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1e080e2b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1e080e2b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1e080e2b
Branch: refs/heads/DSL_SQL
Commit: 1e080e2ba25515faf5ff30832e7a4f38df4110ea
Parents: d52df74 ce59eec
Author: Luke Cwik <lc...@google.com>
Authored: Tue Jun 13 17:07:14 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Tue Jun 13 17:07:14 2017 -0700
----------------------------------------------------------------------
.../java/org/apache/beam/dsls/sql/BeamSql.java | 12 +++++++---
.../beam/dsls/sql/example/BeamSqlExample.java | 24 ++++++++++++++++----
2 files changed, 29 insertions(+), 7 deletions(-)
----------------------------------------------------------------------