You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jb...@apache.org on 2017/05/21 15:26:19 UTC
[1/2] beam git commit: [BEAM-2292] Add BeamPCollectionTable to create
table from PCollection
Repository: beam
Updated Branches:
refs/heads/DSL_SQL 25babc999 -> 127790212
[BEAM-2292] Add BeamPCollectionTable to create table from PCollection<BeamSQLRow>
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1232bf11
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1232bf11
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1232bf11
Branch: refs/heads/DSL_SQL
Commit: 1232bf11f2c7585f68f8f87ded30dee6a45b4fa5
Parents: 25babc9
Author: mingmxu <mi...@ebay.com>
Authored: Sun May 14 13:22:30 2017 -0700
Committer: Jean-Baptiste Onofré <jb...@apache.org>
Committed: Sun May 21 16:55:56 2017 +0200
----------------------------------------------------------------------
.../beam/dsls/sql/rel/BeamIOSourceRel.java | 3 +-
.../beam/dsls/sql/schema/BaseBeamTable.java | 6 +-
.../dsls/sql/schema/BeamPCollectionTable.java | 62 +++++++++++++++++++
.../dsls/sql/schema/kafka/BeamKafkaTable.java | 12 +---
.../dsls/sql/schema/text/BeamTextCSVTable.java | 8 ++-
.../schema/text/BeamTextCSVTableIOReader.java | 9 +--
.../dsls/sql/planner/MockedBeamSQLTable.java | 5 +-
.../sql/schema/BeamPCollectionTableTest.java | 64 ++++++++++++++++++++
.../sql/schema/text/BeamTextCSVTableTest.java | 12 ++--
9 files changed, 150 insertions(+), 31 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/1232bf11/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java
index 61f53eb..f4d5001 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java
@@ -49,8 +49,7 @@ public class BeamIOSourceRel extends TableScan implements BeamRelNode {
String stageName = BeamSQLRelUtils.getStageName(this);
- PCollection<BeamSQLRow> sourceStream = planCreator.getPipeline().apply(stageName,
- sourceTable.buildIOReader());
+ PCollection<BeamSQLRow> sourceStream = sourceTable.buildIOReader(planCreator.getPipeline());
return sourceStream;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/1232bf11/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BaseBeamTable.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BaseBeamTable.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BaseBeamTable.java
index 2ecfa38..52d2bbd 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BaseBeamTable.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BaseBeamTable.java
@@ -19,8 +19,8 @@ package org.apache.beam.dsls.sql.schema;
import java.io.Serializable;
import org.apache.beam.dsls.sql.planner.BeamQueryPlanner;
+import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import org.apache.calcite.DataContext;
@@ -53,10 +53,10 @@ public abstract class BaseBeamTable implements ScannableTable, Serializable {
public abstract BeamIOType getSourceType();
/**
- * create a {@code IO.read()} instance to read from source.
+ * create a {@code PCollection<BeamSQLRow>} from source.
*
*/
- public abstract PTransform<? super PBegin, PCollection<BeamSQLRow>> buildIOReader();
+ public abstract PCollection<BeamSQLRow> buildIOReader(Pipeline pipeline);
/**
* create a {@code IO.write()} instance to write to target.
http://git-wip-us.apache.org/repos/asf/beam/blob/1232bf11/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTable.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTable.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTable.java
new file mode 100644
index 0000000..1c3ab5b
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTable.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.schema;
+
+import org.apache.beam.dsls.sql.exception.BeamInvalidOperatorException;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollection.IsBounded;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.calcite.rel.type.RelProtoDataType;
+
+/**
+ * {@code BeamPCollectionTable} converts a {@code PCollection<BeamSQLRow>} as a virtual table,
+ * then a downstream query can query directly.
+ */
+public class BeamPCollectionTable extends BaseBeamTable {
+ private BeamIOType ioType;
+ private PCollection<BeamSQLRow> upstream;
+
+ protected BeamPCollectionTable(RelProtoDataType protoRowType) {
+ super(protoRowType);
+ }
+
+ public BeamPCollectionTable(PCollection<BeamSQLRow> upstream, RelProtoDataType protoRowType){
+ this(protoRowType);
+ ioType = upstream.isBounded().equals(IsBounded.BOUNDED)
+ ? BeamIOType.BOUNDED : BeamIOType.UNBOUNDED;
+ this.upstream = upstream;
+ }
+
+ @Override
+ public BeamIOType getSourceType() {
+ return ioType;
+ }
+
+ @Override
+ public PCollection<BeamSQLRow> buildIOReader(Pipeline pipeline) {
+ return upstream;
+ }
+
+ @Override
+ public PTransform<? super PCollection<BeamSQLRow>, PDone> buildIOWriter() {
+ throw new BeamInvalidOperatorException("cannot use [BeamPCollectionTable] as target");
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/1232bf11/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaTable.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaTable.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaTable.java
index c8c851c..7342cee 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaTable.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaTable.java
@@ -24,6 +24,7 @@ import java.util.Map;
import org.apache.beam.dsls.sql.schema.BaseBeamTable;
import org.apache.beam.dsls.sql.schema.BeamIOType;
import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.transforms.PTransform;
@@ -72,19 +73,12 @@ public abstract class BeamKafkaTable extends BaseBeamTable implements Serializab
getPTransformForOutput();
@Override
- public PTransform<? super PBegin, PCollection<BeamSQLRow>> buildIOReader() {
- return new PTransform<PBegin, PCollection<BeamSQLRow>>() {
-
- @Override
- public PCollection<BeamSQLRow> expand(PBegin input) {
- return input.apply("read",
+ public PCollection<BeamSQLRow> buildIOReader(Pipeline pipeline) {
+ return PBegin.in(pipeline).apply("read",
KafkaIO.<byte[], byte[]>read().withBootstrapServers(bootstrapServers).withTopics(topics)
.updateConsumerProperties(configUpdates).withKeyCoder(ByteArrayCoder.of())
.withValueCoder(ByteArrayCoder.of()).withoutMetadata())
.apply("in_format", getPTransformForInput());
-
- }
- };
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/1232bf11/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTable.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTable.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTable.java
index b9e6b81..6b21289 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTable.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTable.java
@@ -19,6 +19,8 @@
package org.apache.beam.dsls.sql.schema.text;
import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
@@ -55,8 +57,10 @@ public class BeamTextCSVTable extends BeamTextTable {
}
@Override
- public PTransform<? super PBegin, PCollection<BeamSQLRow>> buildIOReader() {
- return new BeamTextCSVTableIOReader(beamSqlRecordType, filePattern, csvFormat);
+ public PCollection<BeamSQLRow> buildIOReader(Pipeline pipeline) {
+ return PBegin.in(pipeline).apply("decodeRecord", TextIO.Read.from(filePattern))
+ .apply("parseCSVLine",
+ new BeamTextCSVTableIOReader(beamSqlRecordType, filePattern, csvFormat));
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/1232bf11/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOReader.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOReader.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOReader.java
index 3c031ce..59d77c0 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOReader.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOReader.java
@@ -24,11 +24,9 @@ import java.io.Serializable;
import org.apache.beam.dsls.sql.schema.BeamSQLRecordType;
import org.apache.beam.dsls.sql.schema.BeamSQLRow;
-import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.commons.csv.CSVFormat;
@@ -36,7 +34,7 @@ import org.apache.commons.csv.CSVFormat;
* IOReader for {@code BeamTextCSVTable}.
*/
public class BeamTextCSVTableIOReader
- extends PTransform<PBegin, PCollection<BeamSQLRow>>
+ extends PTransform<PCollection<String>, PCollection<BeamSQLRow>>
implements Serializable {
private String filePattern;
protected BeamSQLRecordType beamSqlRecordType;
@@ -50,9 +48,8 @@ public class BeamTextCSVTableIOReader
}
@Override
- public PCollection<BeamSQLRow> expand(PBegin input) {
- return input.apply("decodeRecord", TextIO.Read.from(filePattern))
- .apply(ParDo.of(new DoFn<String, BeamSQLRow>() {
+ public PCollection<BeamSQLRow> expand(PCollection<String> input) {
+ return input.apply(ParDo.of(new DoFn<String, BeamSQLRow>() {
@ProcessElement
public void processElement(ProcessContext ctx) {
String str = ctx.element();
http://git-wip-us.apache.org/repos/asf/beam/blob/1232bf11/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSQLTable.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSQLTable.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSQLTable.java
index 8ccb332..78fd055 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSQLTable.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSQLTable.java
@@ -24,6 +24,7 @@ import org.apache.beam.dsls.sql.schema.BaseBeamTable;
import org.apache.beam.dsls.sql.schema.BeamIOType;
import org.apache.beam.dsls.sql.schema.BeamSQLRecordType;
import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
@@ -119,8 +120,8 @@ public class MockedBeamSQLTable extends BaseBeamTable {
}
@Override
- public PTransform<? super PBegin, PCollection<BeamSQLRow>> buildIOReader() {
- return Create.of(inputRecords);
+ public PCollection<BeamSQLRow> buildIOReader(Pipeline pipeline) {
+ return PBegin.in(pipeline).apply(Create.of(inputRecords));
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/1232bf11/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTableTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTableTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTableTest.java
new file mode 100644
index 0000000..6f24e2a
--- /dev/null
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTableTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.schema;
+
+import org.apache.beam.dsls.sql.planner.BasePlanner;
+import org.apache.beam.dsls.sql.planner.BeamQueryPlanner;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelProtoDataType;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Test case for BeamPCollectionTable.
+ */
+public class BeamPCollectionTableTest extends BasePlanner{
+ public static TestPipeline pipeline = TestPipeline.create();
+
+ @BeforeClass
+ public static void prepareTable(){
+ RelProtoDataType protoRowType = new RelProtoDataType() {
+ @Override
+ public RelDataType apply(RelDataTypeFactory a0) {
+ return a0.builder().add("c1", SqlTypeName.INTEGER)
+ .add("c2", SqlTypeName.VARCHAR).build();
+ }
+ };
+
+ BeamSQLRow row = new BeamSQLRow(BeamSQLRecordType.from(
+ protoRowType.apply(BeamQueryPlanner.TYPE_FACTORY)));
+ row.addField(0, 1);
+ row.addField(1, "hello world.");
+ PCollection<BeamSQLRow> inputStream = PBegin.in(pipeline).apply(Create.of(row));
+ runner.addTableMetadata("COLLECTION_TABLE",
+ new BeamPCollectionTable(inputStream, protoRowType));
+ }
+
+ @Test
+ public void testSelectFromPCollectionTable() throws Exception{
+ String sql = "select c1, c2 from COLLECTION_TABLE";
+ runner.executionPlan(sql);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/1232bf11/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableTest.java
index 3bc29e4..4c403ac 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableTest.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableTest.java
@@ -80,22 +80,20 @@ public class BeamTextCSVTableTest {
private static File writerTargetFile;
@Test public void testBuildIOReader() {
- PCollection<BeamSQLRow> rows = pipeline.apply(
- new BeamTextCSVTable(buildRowType(), readerSourceFile.getAbsolutePath()).buildIOReader());
+ PCollection<BeamSQLRow> rows = new BeamTextCSVTable(buildRowType(),
+ readerSourceFile.getAbsolutePath()).buildIOReader(pipeline);
PAssert.that(rows).containsInAnyOrder(testDataRows);
pipeline.run();
}
@Test public void testBuildIOWriter() {
- // reader from a source file, then write into a target file
- pipeline.apply(
- new BeamTextCSVTable(buildRowType(), readerSourceFile.getAbsolutePath()).buildIOReader())
+ new BeamTextCSVTable(buildRowType(), readerSourceFile.getAbsolutePath()).buildIOReader(pipeline)
.apply(new BeamTextCSVTable(buildRowType(), writerTargetFile.getAbsolutePath())
.buildIOWriter());
pipeline.run();
- PCollection<BeamSQLRow> rows = pipeline2.apply(
- new BeamTextCSVTable(buildRowType(), writerTargetFile.getAbsolutePath()).buildIOReader());
+ PCollection<BeamSQLRow> rows = new BeamTextCSVTable(buildRowType(),
+ writerTargetFile.getAbsolutePath()).buildIOReader(pipeline2);
// confirm the two reads match
PAssert.that(rows).containsInAnyOrder(testDataRows);
[2/2] beam git commit: [BEAM-2292] This closes #3138
Posted by jb...@apache.org.
[BEAM-2292] This closes #3138
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/12779021
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/12779021
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/12779021
Branch: refs/heads/DSL_SQL
Commit: 127790212b8ec3b3b9e0c29117bce594e7549d05
Parents: 25babc9 1232bf1
Author: Jean-Baptiste Onofré <jb...@apache.org>
Authored: Sun May 21 17:26:12 2017 +0200
Committer: Jean-Baptiste Onofré <jb...@apache.org>
Committed: Sun May 21 17:26:12 2017 +0200
----------------------------------------------------------------------
.../beam/dsls/sql/rel/BeamIOSourceRel.java | 3 +-
.../beam/dsls/sql/schema/BaseBeamTable.java | 6 +-
.../dsls/sql/schema/BeamPCollectionTable.java | 62 +++++++++++++++++++
.../dsls/sql/schema/kafka/BeamKafkaTable.java | 12 +---
.../dsls/sql/schema/text/BeamTextCSVTable.java | 8 ++-
.../schema/text/BeamTextCSVTableIOReader.java | 9 +--
.../dsls/sql/planner/MockedBeamSQLTable.java | 5 +-
.../sql/schema/BeamPCollectionTableTest.java | 64 ++++++++++++++++++++
.../sql/schema/text/BeamTextCSVTableTest.java | 12 ++--
9 files changed, 150 insertions(+), 31 deletions(-)
----------------------------------------------------------------------