You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jb...@apache.org on 2017/05/21 15:26:19 UTC

[1/2] beam git commit: [BEAM-2292] Add BeamPCollectionTable to create table from PCollection

Repository: beam
Updated Branches:
  refs/heads/DSL_SQL 25babc999 -> 127790212


[BEAM-2292] Add BeamPCollectionTable to create table from PCollection<BeamSQLRow>


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1232bf11
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1232bf11
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1232bf11

Branch: refs/heads/DSL_SQL
Commit: 1232bf11f2c7585f68f8f87ded30dee6a45b4fa5
Parents: 25babc9
Author: mingmxu <mi...@ebay.com>
Authored: Sun May 14 13:22:30 2017 -0700
Committer: Jean-Baptiste Onofré <jb...@apache.org>
Committed: Sun May 21 16:55:56 2017 +0200

----------------------------------------------------------------------
 .../beam/dsls/sql/rel/BeamIOSourceRel.java      |  3 +-
 .../beam/dsls/sql/schema/BaseBeamTable.java     |  6 +-
 .../dsls/sql/schema/BeamPCollectionTable.java   | 62 +++++++++++++++++++
 .../dsls/sql/schema/kafka/BeamKafkaTable.java   | 12 +---
 .../dsls/sql/schema/text/BeamTextCSVTable.java  |  8 ++-
 .../schema/text/BeamTextCSVTableIOReader.java   |  9 +--
 .../dsls/sql/planner/MockedBeamSQLTable.java    |  5 +-
 .../sql/schema/BeamPCollectionTableTest.java    | 64 ++++++++++++++++++++
 .../sql/schema/text/BeamTextCSVTableTest.java   | 12 ++--
 9 files changed, 150 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/1232bf11/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java
index 61f53eb..f4d5001 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java
@@ -49,8 +49,7 @@ public class BeamIOSourceRel extends TableScan implements BeamRelNode {
 
     String stageName = BeamSQLRelUtils.getStageName(this);
 
-    PCollection<BeamSQLRow> sourceStream = planCreator.getPipeline().apply(stageName,
-        sourceTable.buildIOReader());
+    PCollection<BeamSQLRow> sourceStream = sourceTable.buildIOReader(planCreator.getPipeline());
 
     return sourceStream;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/1232bf11/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BaseBeamTable.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BaseBeamTable.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BaseBeamTable.java
index 2ecfa38..52d2bbd 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BaseBeamTable.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BaseBeamTable.java
@@ -19,8 +19,8 @@ package org.apache.beam.dsls.sql.schema;
 
 import java.io.Serializable;
 import org.apache.beam.dsls.sql.planner.BeamQueryPlanner;
+import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PDone;
 import org.apache.calcite.DataContext;
@@ -53,10 +53,10 @@ public abstract class BaseBeamTable implements ScannableTable, Serializable {
   public abstract BeamIOType getSourceType();
 
   /**
-   * create a {@code IO.read()} instance to read from source.
+   * create a {@code PCollection<BeamSQLRow>} from source.
    *
    */
-  public abstract PTransform<? super PBegin, PCollection<BeamSQLRow>> buildIOReader();
+  public abstract PCollection<BeamSQLRow> buildIOReader(Pipeline pipeline);
 
   /**
    * create a {@code IO.write()} instance to write to target.

http://git-wip-us.apache.org/repos/asf/beam/blob/1232bf11/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTable.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTable.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTable.java
new file mode 100644
index 0000000..1c3ab5b
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTable.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.schema;
+
+import org.apache.beam.dsls.sql.exception.BeamInvalidOperatorException;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollection.IsBounded;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.calcite.rel.type.RelProtoDataType;
+
+/**
+ * {@code BeamPCollectionTable} converts a {@code PCollection<BeamSQLRow>} as a virtual table,
+ * then a downstream query can query directly.
+ */
+public class BeamPCollectionTable extends BaseBeamTable {
+  private BeamIOType ioType;
+  private PCollection<BeamSQLRow> upstream;
+
+  protected BeamPCollectionTable(RelProtoDataType protoRowType) {
+    super(protoRowType);
+  }
+
+  public BeamPCollectionTable(PCollection<BeamSQLRow> upstream, RelProtoDataType protoRowType){
+    this(protoRowType);
+    ioType = upstream.isBounded().equals(IsBounded.BOUNDED)
+        ? BeamIOType.BOUNDED : BeamIOType.UNBOUNDED;
+    this.upstream = upstream;
+  }
+
+  @Override
+  public BeamIOType getSourceType() {
+    return ioType;
+  }
+
+  @Override
+  public PCollection<BeamSQLRow> buildIOReader(Pipeline pipeline) {
+    return upstream;
+  }
+
+  @Override
+  public PTransform<? super PCollection<BeamSQLRow>, PDone> buildIOWriter() {
+    throw new BeamInvalidOperatorException("cannot use [BeamPCollectionTable] as target");
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/1232bf11/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaTable.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaTable.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaTable.java
index c8c851c..7342cee 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaTable.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaTable.java
@@ -24,6 +24,7 @@ import java.util.Map;
 import org.apache.beam.dsls.sql.schema.BaseBeamTable;
 import org.apache.beam.dsls.sql.schema.BeamIOType;
 import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.ByteArrayCoder;
 import org.apache.beam.sdk.io.kafka.KafkaIO;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -72,19 +73,12 @@ public abstract class BeamKafkaTable extends BaseBeamTable implements Serializab
       getPTransformForOutput();
 
   @Override
-  public PTransform<? super PBegin, PCollection<BeamSQLRow>> buildIOReader() {
-    return new PTransform<PBegin, PCollection<BeamSQLRow>>() {
-
-      @Override
-      public PCollection<BeamSQLRow> expand(PBegin input) {
-        return input.apply("read",
+  public PCollection<BeamSQLRow> buildIOReader(Pipeline pipeline) {
+    return PBegin.in(pipeline).apply("read",
             KafkaIO.<byte[], byte[]>read().withBootstrapServers(bootstrapServers).withTopics(topics)
                 .updateConsumerProperties(configUpdates).withKeyCoder(ByteArrayCoder.of())
                 .withValueCoder(ByteArrayCoder.of()).withoutMetadata())
             .apply("in_format", getPTransformForInput());
-
-      }
-    };
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/1232bf11/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTable.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTable.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTable.java
index b9e6b81..6b21289 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTable.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTable.java
@@ -19,6 +19,8 @@
 package org.apache.beam.dsls.sql.schema.text;
 
 import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
@@ -55,8 +57,10 @@ public class BeamTextCSVTable extends BeamTextTable {
   }
 
   @Override
-  public PTransform<? super PBegin, PCollection<BeamSQLRow>> buildIOReader() {
-    return new BeamTextCSVTableIOReader(beamSqlRecordType, filePattern, csvFormat);
+  public PCollection<BeamSQLRow> buildIOReader(Pipeline pipeline) {
+    return PBegin.in(pipeline).apply("decodeRecord", TextIO.Read.from(filePattern))
+        .apply("parseCSVLine",
+            new BeamTextCSVTableIOReader(beamSqlRecordType, filePattern, csvFormat));
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/1232bf11/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOReader.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOReader.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOReader.java
index 3c031ce..59d77c0 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOReader.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOReader.java
@@ -24,11 +24,9 @@ import java.io.Serializable;
 
 import org.apache.beam.dsls.sql.schema.BeamSQLRecordType;
 import org.apache.beam.dsls.sql.schema.BeamSQLRow;
-import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.commons.csv.CSVFormat;
 
@@ -36,7 +34,7 @@ import org.apache.commons.csv.CSVFormat;
  * IOReader for {@code BeamTextCSVTable}.
  */
 public class BeamTextCSVTableIOReader
-    extends PTransform<PBegin, PCollection<BeamSQLRow>>
+    extends PTransform<PCollection<String>, PCollection<BeamSQLRow>>
     implements Serializable {
   private String filePattern;
   protected BeamSQLRecordType beamSqlRecordType;
@@ -50,9 +48,8 @@ public class BeamTextCSVTableIOReader
   }
 
   @Override
-  public PCollection<BeamSQLRow> expand(PBegin input) {
-    return input.apply("decodeRecord", TextIO.Read.from(filePattern))
-        .apply(ParDo.of(new DoFn<String, BeamSQLRow>() {
+  public PCollection<BeamSQLRow> expand(PCollection<String> input) {
+    return input.apply(ParDo.of(new DoFn<String, BeamSQLRow>() {
           @ProcessElement
           public void processElement(ProcessContext ctx) {
             String str = ctx.element();

http://git-wip-us.apache.org/repos/asf/beam/blob/1232bf11/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSQLTable.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSQLTable.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSQLTable.java
index 8ccb332..78fd055 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSQLTable.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSQLTable.java
@@ -24,6 +24,7 @@ import org.apache.beam.dsls.sql.schema.BaseBeamTable;
 import org.apache.beam.dsls.sql.schema.BeamIOType;
 import org.apache.beam.dsls.sql.schema.BeamSQLRecordType;
 import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -119,8 +120,8 @@ public class MockedBeamSQLTable extends BaseBeamTable {
   }
 
   @Override
-  public PTransform<? super PBegin, PCollection<BeamSQLRow>> buildIOReader() {
-    return Create.of(inputRecords);
+  public PCollection<BeamSQLRow> buildIOReader(Pipeline pipeline) {
+    return PBegin.in(pipeline).apply(Create.of(inputRecords));
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/1232bf11/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTableTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTableTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTableTest.java
new file mode 100644
index 0000000..6f24e2a
--- /dev/null
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTableTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.schema;
+
+import org.apache.beam.dsls.sql.planner.BasePlanner;
+import org.apache.beam.dsls.sql.planner.BeamQueryPlanner;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelProtoDataType;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Test case for BeamPCollectionTable.
+ */
+public class BeamPCollectionTableTest extends BasePlanner{
+  public static TestPipeline pipeline = TestPipeline.create();
+
+  @BeforeClass
+  public static void prepareTable(){
+    RelProtoDataType protoRowType = new RelProtoDataType() {
+      @Override
+      public RelDataType apply(RelDataTypeFactory a0) {
+        return a0.builder().add("c1", SqlTypeName.INTEGER)
+            .add("c2", SqlTypeName.VARCHAR).build();
+      }
+    };
+
+    BeamSQLRow row = new BeamSQLRow(BeamSQLRecordType.from(
+        protoRowType.apply(BeamQueryPlanner.TYPE_FACTORY)));
+    row.addField(0, 1);
+    row.addField(1, "hello world.");
+    PCollection<BeamSQLRow> inputStream = PBegin.in(pipeline).apply(Create.of(row));
+    runner.addTableMetadata("COLLECTION_TABLE",
+        new BeamPCollectionTable(inputStream, protoRowType));
+  }
+
+  @Test
+  public void testSelectFromPCollectionTable() throws Exception{
+    String sql = "select c1, c2 from COLLECTION_TABLE";
+    runner.executionPlan(sql);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/1232bf11/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableTest.java
index 3bc29e4..4c403ac 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableTest.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableTest.java
@@ -80,22 +80,20 @@ public class BeamTextCSVTableTest {
   private static File writerTargetFile;
 
   @Test public void testBuildIOReader() {
-    PCollection<BeamSQLRow> rows = pipeline.apply(
-        new BeamTextCSVTable(buildRowType(), readerSourceFile.getAbsolutePath()).buildIOReader());
+    PCollection<BeamSQLRow> rows = new BeamTextCSVTable(buildRowType(),
+        readerSourceFile.getAbsolutePath()).buildIOReader(pipeline);
     PAssert.that(rows).containsInAnyOrder(testDataRows);
     pipeline.run();
   }
 
   @Test public void testBuildIOWriter() {
-    // reader from a source file, then write into a target file
-    pipeline.apply(
-        new BeamTextCSVTable(buildRowType(), readerSourceFile.getAbsolutePath()).buildIOReader())
+    new BeamTextCSVTable(buildRowType(), readerSourceFile.getAbsolutePath()).buildIOReader(pipeline)
         .apply(new BeamTextCSVTable(buildRowType(), writerTargetFile.getAbsolutePath())
             .buildIOWriter());
     pipeline.run();
 
-    PCollection<BeamSQLRow> rows = pipeline2.apply(
-        new BeamTextCSVTable(buildRowType(), writerTargetFile.getAbsolutePath()).buildIOReader());
+    PCollection<BeamSQLRow> rows = new BeamTextCSVTable(buildRowType(),
+        writerTargetFile.getAbsolutePath()).buildIOReader(pipeline2);
 
     // confirm the two reads match
     PAssert.that(rows).containsInAnyOrder(testDataRows);


[2/2] beam git commit: [BEAM-2292] This closes #3138

Posted by jb...@apache.org.
[BEAM-2292] This closes #3138


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/12779021
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/12779021
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/12779021

Branch: refs/heads/DSL_SQL
Commit: 127790212b8ec3b3b9e0c29117bce594e7549d05
Parents: 25babc9 1232bf1
Author: Jean-Baptiste Onofré <jb...@apache.org>
Authored: Sun May 21 17:26:12 2017 +0200
Committer: Jean-Baptiste Onofré <jb...@apache.org>
Committed: Sun May 21 17:26:12 2017 +0200

----------------------------------------------------------------------
 .../beam/dsls/sql/rel/BeamIOSourceRel.java      |  3 +-
 .../beam/dsls/sql/schema/BaseBeamTable.java     |  6 +-
 .../dsls/sql/schema/BeamPCollectionTable.java   | 62 +++++++++++++++++++
 .../dsls/sql/schema/kafka/BeamKafkaTable.java   | 12 +---
 .../dsls/sql/schema/text/BeamTextCSVTable.java  |  8 ++-
 .../schema/text/BeamTextCSVTableIOReader.java   |  9 +--
 .../dsls/sql/planner/MockedBeamSQLTable.java    |  5 +-
 .../sql/schema/BeamPCollectionTableTest.java    | 64 ++++++++++++++++++++
 .../sql/schema/text/BeamTextCSVTableTest.java   | 12 ++--
 9 files changed, 150 insertions(+), 31 deletions(-)
----------------------------------------------------------------------