You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ta...@apache.org on 2017/08/09 20:17:24 UTC
[1/2] beam git commit: Update BeamSqlExample: - Fix mvn example - Add
aggregation/group by
Repository: beam
Updated Branches:
refs/heads/DSL_SQL f59dccc51 -> 3e25ffb04
Update BeamSqlExample:
- Fix mvn example
- Add aggregation/group by
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ee1f97bb
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ee1f97bb
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ee1f97bb
Branch: refs/heads/DSL_SQL
Commit: ee1f97bb4bf5baa1810a787d0b0e68f314e7c110
Parents: f59dccc
Author: mingmxu <mi...@ebay.com>
Authored: Tue Aug 8 21:52:54 2017 -0700
Committer: Tyler Akidau <ta...@apache.org>
Committed: Wed Aug 9 09:50:37 2017 -0700
----------------------------------------------------------------------
.../extensions/sql/example/BeamSqlExample.java | 24 ++++++++++++--------
1 file changed, 15 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/ee1f97bb/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java
index 3a46acc..91251cf 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java
@@ -39,11 +39,10 @@ import org.apache.beam.sdk.values.TupleTag;
*
* <p>Run the example with
* <pre>
- * mvn -pl dsls/sql compile exec:java \
- * -Dexec.mainClass=BeamSqlExample \
+ * mvn -pl sdks/java/extensions/sql \
+ * compile exec:java -Dexec.mainClass=org.apache.beam.sdk.extensions.sql.example.BeamSqlExample \
* -Dexec.args="--runner=DirectRunner" -Pdirect-runner
* </pre>
- *
*/
class BeamSqlExample {
public static void main(String[] args) throws Exception {
@@ -54,21 +53,26 @@ class BeamSqlExample {
List<String> fieldNames = Arrays.asList("c1", "c2", "c3");
List<Integer> fieldTypes = Arrays.asList(Types.INTEGER, Types.VARCHAR, Types.DOUBLE);
BeamRecordSqlType type = BeamRecordSqlType.create(fieldNames, fieldTypes);
- BeamRecord row = new BeamRecord(type, 1, "row", 1.0);
+ BeamRecord row1 = new BeamRecord(type, 1, "row", 1.0);
+ BeamRecord row2 = new BeamRecord(type, 2, "row", 2.0);
+ BeamRecord row3 = new BeamRecord(type, 3, "row", 3.0);
//create a source PCollection with Create.of();
- PCollection<BeamRecord> inputTable = PBegin.in(p).apply(Create.of(row)
+ PCollection<BeamRecord> inputTable = PBegin.in(p).apply(Create.of(row1, row2, row3)
.withCoder(type.getRecordCoder()));
//Case 1. run a simple SQL query over input PCollection with BeamSql.simpleQuery;
PCollection<BeamRecord> outputStream = inputTable.apply(
- BeamSql.simpleQuery("select c1, c2, c3 from PCOLLECTION where c1=1"));
+ BeamSql.simpleQuery("select c1, c2, c3 from PCOLLECTION where c1 > 1"));
//print the output record of case 1;
outputStream.apply("log_result",
MapElements.<BeamRecord, Void>via(new SimpleFunction<BeamRecord, Void>() {
public Void apply(BeamRecord input) {
- System.out.println("PCOLLECTION: " + input);
+ //expect output:
+ // PCOLLECTION: [3, row, 3.0]
+ // PCOLLECTION: [2, row, 2.0]
+ System.out.println("PCOLLECTION: " + input.getDataValues());
return null;
}
}));
@@ -76,14 +80,16 @@ class BeamSqlExample {
//Case 2. run the query with BeamSql.query over result PCollection of case 1.
PCollection<BeamRecord> outputStream2 =
PCollectionTuple.of(new TupleTag<BeamRecord>("CASE1_RESULT"), outputStream)
- .apply(BeamSql.query("select c2, c3 from CASE1_RESULT where c1=1"));
+ .apply(BeamSql.query("select c2, sum(c3) from CASE1_RESULT group by c2"));
//print the output record of case 2;
outputStream2.apply("log_result",
MapElements.<BeamRecord, Void>via(new SimpleFunction<BeamRecord, Void>() {
@Override
public Void apply(BeamRecord input) {
- System.out.println("TABLE_B: " + input);
+ //expect output:
+ // CASE1_RESULT: [row, 5.0]
+ System.out.println("CASE1_RESULT: " + input.getDataValues());
return null;
}
}));
[2/2] beam git commit: [BEAM-2749] This closes #3706
Posted by ta...@apache.org.
[BEAM-2749] This closes #3706
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3e25ffb0
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3e25ffb0
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3e25ffb0
Branch: refs/heads/DSL_SQL
Commit: 3e25ffb04e3190a67174bbe3883c3ca982840663
Parents: f59dccc ee1f97b
Author: Tyler Akidau <ta...@apache.org>
Authored: Wed Aug 9 09:52:51 2017 -0700
Committer: Tyler Akidau <ta...@apache.org>
Committed: Wed Aug 9 09:52:51 2017 -0700
----------------------------------------------------------------------
.../extensions/sql/example/BeamSqlExample.java | 24 ++++++++++++--------
1 file changed, 15 insertions(+), 9 deletions(-)
----------------------------------------------------------------------