You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2017/08/04 17:09:48 UTC
[5/7] beam git commit: refactor BeamRecord, BeamRecordType,
BeamSqlRecordType, BeamRecordCoder
http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlPositionExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlPositionExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlPositionExpression.java
index ea5f749..06dce91 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlPositionExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlPositionExpression.java
@@ -21,7 +21,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string;
import java.util.List;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.values.BeamRecord;
import org.apache.calcite.sql.type.SqlTypeName;
/**
@@ -56,7 +56,7 @@ public class BeamSqlPositionExpression extends BeamSqlExpression {
return true;
}
- @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+ @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow) {
String targetStr = opValueEvaluated(0, inputRow);
String containingStr = opValueEvaluated(1, inputRow);
int from = -1;
http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlSubstringExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlSubstringExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlSubstringExpression.java
index 25f205a..f8582aa 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlSubstringExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlSubstringExpression.java
@@ -21,7 +21,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string;
import java.util.List;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.values.BeamRecord;
import org.apache.calcite.sql.type.SqlTypeName;
/**
@@ -54,7 +54,7 @@ public class BeamSqlSubstringExpression extends BeamSqlExpression {
return true;
}
- @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+ @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow) {
String str = opValueEvaluated(0, inputRow);
int idx = opValueEvaluated(1, inputRow);
int startIdx = idx;
http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlTrimExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlTrimExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlTrimExpression.java
index 9493e24..9c2a7ae 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlTrimExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlTrimExpression.java
@@ -21,7 +21,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string;
import java.util.List;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.values.BeamRecord;
import org.apache.calcite.sql.fun.SqlTrimFunction;
import org.apache.calcite.sql.type.SqlTypeName;
@@ -58,7 +58,7 @@ public class BeamSqlTrimExpression extends BeamSqlExpression {
return true;
}
- @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+ @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow) {
if (operands.size() == 1) {
return BeamSqlPrimitive.of(SqlTypeName.VARCHAR,
opValueEvaluated(0, inputRow).toString().trim());
http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlUpperExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlUpperExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlUpperExpression.java
index 9769c0e..94ac2e2 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlUpperExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlUpperExpression.java
@@ -21,7 +21,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string;
import java.util.List;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.values.BeamRecord;
import org.apache.calcite.sql.type.SqlTypeName;
/**
@@ -32,7 +32,7 @@ public class BeamSqlUpperExpression extends BeamSqlStringUnaryExpression {
super(operands, SqlTypeName.VARCHAR);
}
- @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+ @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow) {
String str = opValueEvaluated(0, inputRow);
return BeamSqlPrimitive.of(SqlTypeName.VARCHAR, str.toUpperCase());
}
http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamQueryPlanner.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamQueryPlanner.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamQueryPlanner.java
index dd01a87..b421bc3 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamQueryPlanner.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamQueryPlanner.java
@@ -27,7 +27,7 @@ import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamLogicalConvention;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
import org.apache.beam.sdk.extensions.sql.schema.BaseBeamTable;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.values.BeamRecord;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.calcite.adapter.java.JavaTypeFactory;
@@ -107,7 +107,7 @@ public class BeamQueryPlanner {
* which is linked with the given {@code pipeline}. The final output stream is returned as
* {@code PCollection} so more operations can be applied.
*/
- public PCollection<BeamSqlRow> compileBeamPipeline(String sqlStatement, Pipeline basePipeline
+ public PCollection<BeamRecord> compileBeamPipeline(String sqlStatement, Pipeline basePipeline
, BeamSqlEnv sqlEnv) throws Exception {
BeamRelNode relNode = convertToBeamRel(sqlStatement);
http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java
index 8e78684..d91b484 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java
@@ -19,13 +19,12 @@ package org.apache.beam.sdk.extensions.sql.impl.rel;
import java.util.ArrayList;
import java.util.List;
+import org.apache.beam.sdk.coders.BeamRecordCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
import org.apache.beam.sdk.extensions.sql.impl.transform.BeamAggregationTransforms;
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowCoder;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.WithKeys;
@@ -34,6 +33,7 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.Trigger;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.values.BeamRecord;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
@@ -55,7 +55,7 @@ import org.joda.time.Duration;
*/
public class BeamAggregationRel extends Aggregate implements BeamRelNode {
private int windowFieldIdx = -1;
- private WindowFn<BeamSqlRow, BoundedWindow> windowFn;
+ private WindowFn<BeamRecord, BoundedWindow> windowFn;
private Trigger trigger;
private Duration allowedLatence = Duration.ZERO;
@@ -71,12 +71,12 @@ public class BeamAggregationRel extends Aggregate implements BeamRelNode {
}
@Override
- public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections
+ public PCollection<BeamRecord> buildBeamPipeline(PCollectionTuple inputPCollections
, BeamSqlEnv sqlEnv) throws Exception {
RelNode input = getInput();
String stageName = BeamSqlRelUtils.getStageName(this) + "_";
- PCollection<BeamSqlRow> upstream =
+ PCollection<BeamRecord> upstream =
BeamSqlRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections, sqlEnv);
if (windowFieldIdx != -1) {
upstream = upstream.apply(stageName + "assignEventTimestamp", WithTimestamps
@@ -84,14 +84,14 @@ public class BeamAggregationRel extends Aggregate implements BeamRelNode {
.setCoder(upstream.getCoder());
}
- PCollection<BeamSqlRow> windowStream = upstream.apply(stageName + "window",
+ PCollection<BeamRecord> windowStream = upstream.apply(stageName + "window",
Window.into(windowFn)
.triggering(trigger)
.withAllowedLateness(allowedLatence)
.accumulatingFiredPanes());
- BeamSqlRowCoder keyCoder = new BeamSqlRowCoder(exKeyFieldsSchema(input.getRowType()));
- PCollection<KV<BeamSqlRow, BeamSqlRow>> exCombineByStream = windowStream.apply(
+ BeamRecordCoder keyCoder = exKeyFieldsSchema(input.getRowType()).getRecordCoder();
+ PCollection<KV<BeamRecord, BeamRecord>> exCombineByStream = windowStream.apply(
stageName + "exCombineBy",
WithKeys
.of(new BeamAggregationTransforms.AggregationGroupByKeyFn(
@@ -99,19 +99,19 @@ public class BeamAggregationRel extends Aggregate implements BeamRelNode {
.setCoder(KvCoder.of(keyCoder, upstream.getCoder()));
- BeamSqlRowCoder aggCoder = new BeamSqlRowCoder(exAggFieldsSchema());
+ BeamRecordCoder aggCoder = exAggFieldsSchema().getRecordCoder();
- PCollection<KV<BeamSqlRow, BeamSqlRow>> aggregatedStream = exCombineByStream.apply(
+ PCollection<KV<BeamRecord, BeamRecord>> aggregatedStream = exCombineByStream.apply(
stageName + "combineBy",
- Combine.<BeamSqlRow, BeamSqlRow, BeamSqlRow>perKey(
+ Combine.<BeamRecord, BeamRecord, BeamRecord>perKey(
new BeamAggregationTransforms.AggregationAdaptor(getAggCallList(),
CalciteUtils.toBeamRowType(input.getRowType()))))
.setCoder(KvCoder.of(keyCoder, aggCoder));
- PCollection<BeamSqlRow> mergedStream = aggregatedStream.apply(stageName + "mergeRecord",
+ PCollection<BeamRecord> mergedStream = aggregatedStream.apply(stageName + "mergeRecord",
ParDo.of(new BeamAggregationTransforms.MergeAggregationRecord(
CalciteUtils.toBeamRowType(getRowType()), getAggCallList(), windowFieldIdx)));
- mergedStream.setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRowType(getRowType())));
+ mergedStream.setCoder(CalciteUtils.toBeamRowType(getRowType()).getRecordCoder());
return mergedStream;
}
@@ -119,8 +119,8 @@ public class BeamAggregationRel extends Aggregate implements BeamRelNode {
/**
* Type of sub-rowrecord used as Group-By keys.
*/
- private BeamSqlRowType exKeyFieldsSchema(RelDataType relDataType) {
- BeamSqlRowType inputRowType = CalciteUtils.toBeamRowType(relDataType);
+ private BeamSqlRecordType exKeyFieldsSchema(RelDataType relDataType) {
+ BeamSqlRecordType inputRowType = CalciteUtils.toBeamRowType(relDataType);
List<String> fieldNames = new ArrayList<>();
List<Integer> fieldTypes = new ArrayList<>();
for (int i : groupSet.asList()) {
@@ -129,13 +129,13 @@ public class BeamAggregationRel extends Aggregate implements BeamRelNode {
fieldTypes.add(inputRowType.getFieldsType().get(i));
}
}
- return BeamSqlRowType.create(fieldNames, fieldTypes);
+ return BeamSqlRecordType.create(fieldNames, fieldTypes);
}
/**
* Type of sub-rowrecord, that represents the list of aggregation fields.
*/
- private BeamSqlRowType exAggFieldsSchema() {
+ private BeamSqlRecordType exAggFieldsSchema() {
List<String> fieldNames = new ArrayList<>();
List<Integer> fieldTypes = new ArrayList<>();
for (AggregateCall ac : getAggCallList()) {
@@ -143,7 +143,7 @@ public class BeamAggregationRel extends Aggregate implements BeamRelNode {
fieldTypes.add(CalciteUtils.toJavaType(ac.type.getSqlTypeName()));
}
- return BeamSqlRowType.create(fieldNames, fieldTypes);
+ return BeamSqlRecordType.create(fieldNames, fieldTypes);
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamFilterRel.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamFilterRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamFilterRel.java
index b453db4..8fe5be4 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamFilterRel.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamFilterRel.java
@@ -22,9 +22,8 @@ import org.apache.beam.sdk.extensions.sql.impl.interpreter.BeamSqlExpressionExec
import org.apache.beam.sdk.extensions.sql.impl.interpreter.BeamSqlFnExecutor;
import org.apache.beam.sdk.extensions.sql.impl.transform.BeamSqlFilterFn;
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowCoder;
import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.BeamRecord;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.calcite.plan.RelOptCluster;
@@ -50,19 +49,19 @@ public class BeamFilterRel extends Filter implements BeamRelNode {
}
@Override
- public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections
+ public PCollection<BeamRecord> buildBeamPipeline(PCollectionTuple inputPCollections
, BeamSqlEnv sqlEnv) throws Exception {
RelNode input = getInput();
String stageName = BeamSqlRelUtils.getStageName(this);
- PCollection<BeamSqlRow> upstream =
+ PCollection<BeamRecord> upstream =
BeamSqlRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections, sqlEnv);
BeamSqlExpressionExecutor executor = new BeamSqlFnExecutor(this);
- PCollection<BeamSqlRow> filterStream = upstream.apply(stageName,
+ PCollection<BeamRecord> filterStream = upstream.apply(stageName,
ParDo.of(new BeamSqlFilterFn(getRelTypeName(), executor)));
- filterStream.setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRowType(getRowType())));
+ filterStream.setCoder(CalciteUtils.toBeamRowType(getRowType()).getRecordCoder());
return filterStream;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSinkRel.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSinkRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSinkRel.java
index d5eb210..1e3eb4c 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSinkRel.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSinkRel.java
@@ -21,7 +21,7 @@ import com.google.common.base.Joiner;
import java.util.List;
import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
import org.apache.beam.sdk.extensions.sql.schema.BaseBeamTable;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.values.BeamRecord;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.calcite.plan.RelOptCluster;
@@ -55,12 +55,12 @@ public class BeamIOSinkRel extends TableModify implements BeamRelNode {
* which is the persisted PCollection.
*/
@Override
- public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections
+ public PCollection<BeamRecord> buildBeamPipeline(PCollectionTuple inputPCollections
, BeamSqlEnv sqlEnv) throws Exception {
RelNode input = getInput();
String stageName = BeamSqlRelUtils.getStageName(this);
- PCollection<BeamSqlRow> upstream =
+ PCollection<BeamRecord> upstream =
BeamSqlRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections, sqlEnv);
String sourceName = Joiner.on('.').join(getTable().getQualifiedName());
http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRel.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRel.java
index 5179eba..254f990 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRel.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRel.java
@@ -21,8 +21,7 @@ import com.google.common.base.Joiner;
import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
import org.apache.beam.sdk.extensions.sql.schema.BaseBeamTable;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowCoder;
+import org.apache.beam.sdk.values.BeamRecord;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
@@ -42,21 +41,21 @@ public class BeamIOSourceRel extends TableScan implements BeamRelNode {
}
@Override
- public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections
+ public PCollection<BeamRecord> buildBeamPipeline(PCollectionTuple inputPCollections
, BeamSqlEnv sqlEnv) throws Exception {
String sourceName = Joiner.on('.').join(getTable().getQualifiedName());
- TupleTag<BeamSqlRow> sourceTupleTag = new TupleTag<>(sourceName);
+ TupleTag<BeamRecord> sourceTupleTag = new TupleTag<>(sourceName);
if (inputPCollections.has(sourceTupleTag)) {
//choose PCollection from input PCollectionTuple if exists there.
- PCollection<BeamSqlRow> sourceStream = inputPCollections
- .get(new TupleTag<BeamSqlRow>(sourceName));
+ PCollection<BeamRecord> sourceStream = inputPCollections
+ .get(new TupleTag<BeamRecord>(sourceName));
return sourceStream;
} else {
//If not, the source PColection is provided with BaseBeamTable.buildIOReader().
BaseBeamTable sourceTable = sqlEnv.findTable(sourceName);
return sourceTable.buildIOReader(inputPCollections.getPipeline())
- .setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRowType(getRowType())));
+ .setCoder(CalciteUtils.toBeamRowType(getRowType()).getRecordCoder());
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIntersectRel.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIntersectRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIntersectRel.java
index d6ab52d..5919329 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIntersectRel.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIntersectRel.java
@@ -20,7 +20,7 @@ package org.apache.beam.sdk.extensions.sql.impl.rel;
import java.util.List;
import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.values.BeamRecord;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.calcite.plan.RelOptCluster;
@@ -51,7 +51,7 @@ public class BeamIntersectRel extends Intersect implements BeamRelNode {
return new BeamIntersectRel(getCluster(), traitSet, inputs, all);
}
- @Override public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections
+ @Override public PCollection<BeamRecord> buildBeamPipeline(PCollectionTuple inputPCollections
, BeamSqlEnv sqlEnv) throws Exception {
return delegate.buildBeamPipeline(inputPCollections, sqlEnv);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java
index 2de2a89..9e5ce2f 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java
@@ -27,14 +27,13 @@ import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
import org.apache.beam.sdk.extensions.sql.impl.transform.BeamJoinTransforms;
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowCoder;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.windowing.IncompatibleWindowException;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.values.BeamRecord;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
@@ -93,15 +92,15 @@ public class BeamJoinRel extends Join implements BeamRelNode {
joinType);
}
- @Override public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections,
+ @Override public PCollection<BeamRecord> buildBeamPipeline(PCollectionTuple inputPCollections,
BeamSqlEnv sqlEnv)
throws Exception {
BeamRelNode leftRelNode = BeamSqlRelUtils.getBeamRelInput(left);
- BeamSqlRowType leftRowType = CalciteUtils.toBeamRowType(left.getRowType());
- PCollection<BeamSqlRow> leftRows = leftRelNode.buildBeamPipeline(inputPCollections, sqlEnv);
+ BeamSqlRecordType leftRowType = CalciteUtils.toBeamRowType(left.getRowType());
+ PCollection<BeamRecord> leftRows = leftRelNode.buildBeamPipeline(inputPCollections, sqlEnv);
final BeamRelNode rightRelNode = BeamSqlRelUtils.getBeamRelInput(right);
- PCollection<BeamSqlRow> rightRows = rightRelNode.buildBeamPipeline(inputPCollections, sqlEnv);
+ PCollection<BeamRecord> rightRows = rightRelNode.buildBeamPipeline(inputPCollections, sqlEnv);
String stageName = BeamSqlRelUtils.getStageName(this);
WindowFn leftWinFn = leftRows.getWindowingStrategy().getWindowFn();
@@ -119,24 +118,24 @@ public class BeamJoinRel extends Join implements BeamRelNode {
names.add("c" + i);
types.add(leftRowType.getFieldsType().get(pairs.get(i).getKey()));
}
- BeamSqlRowType extractKeyRowType = BeamSqlRowType.create(names, types);
+ BeamSqlRecordType extractKeyRowType = BeamSqlRecordType.create(names, types);
- Coder extractKeyRowCoder = new BeamSqlRowCoder(extractKeyRowType);
+ Coder extractKeyRowCoder = extractKeyRowType.getRecordCoder();
// BeamSqlRow -> KV<BeamSqlRow, BeamSqlRow>
- PCollection<KV<BeamSqlRow, BeamSqlRow>> extractedLeftRows = leftRows
+ PCollection<KV<BeamRecord, BeamRecord>> extractedLeftRows = leftRows
.apply(stageName + "_left_ExtractJoinFields",
MapElements.via(new BeamJoinTransforms.ExtractJoinFields(true, pairs)))
.setCoder(KvCoder.of(extractKeyRowCoder, leftRows.getCoder()));
- PCollection<KV<BeamSqlRow, BeamSqlRow>> extractedRightRows = rightRows
+ PCollection<KV<BeamRecord, BeamRecord>> extractedRightRows = rightRows
.apply(stageName + "_right_ExtractJoinFields",
MapElements.via(new BeamJoinTransforms.ExtractJoinFields(false, pairs)))
.setCoder(KvCoder.of(extractKeyRowCoder, rightRows.getCoder()));
// prepare the NullRows
- BeamSqlRow leftNullRow = buildNullRow(leftRelNode);
- BeamSqlRow rightNullRow = buildNullRow(rightRelNode);
+ BeamRecord leftNullRow = buildNullRow(leftRelNode);
+ BeamRecord rightNullRow = buildNullRow(rightRelNode);
// a regular join
if ((leftRows.isBounded() == PCollection.IsBounded.BOUNDED
@@ -184,11 +183,11 @@ public class BeamJoinRel extends Join implements BeamRelNode {
}
}
- private PCollection<BeamSqlRow> standardJoin(
- PCollection<KV<BeamSqlRow, BeamSqlRow>> extractedLeftRows,
- PCollection<KV<BeamSqlRow, BeamSqlRow>> extractedRightRows,
- BeamSqlRow leftNullRow, BeamSqlRow rightNullRow, String stageName) {
- PCollection<KV<BeamSqlRow, KV<BeamSqlRow, BeamSqlRow>>> joinedRows = null;
+ private PCollection<BeamRecord> standardJoin(
+ PCollection<KV<BeamRecord, BeamRecord>> extractedLeftRows,
+ PCollection<KV<BeamRecord, BeamRecord>> extractedRightRows,
+ BeamRecord leftNullRow, BeamRecord rightNullRow, String stageName) {
+ PCollection<KV<BeamRecord, KV<BeamRecord, BeamRecord>>> joinedRows = null;
switch (joinType) {
case LEFT:
joinedRows = org.apache.beam.sdk.extensions.joinlibrary.Join
@@ -210,53 +209,53 @@ public class BeamJoinRel extends Join implements BeamRelNode {
break;
}
- PCollection<BeamSqlRow> ret = joinedRows
+ PCollection<BeamRecord> ret = joinedRows
.apply(stageName + "_JoinParts2WholeRow",
MapElements.via(new BeamJoinTransforms.JoinParts2WholeRow()))
- .setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRowType(getRowType())));
+ .setCoder(CalciteUtils.toBeamRowType(getRowType()).getRecordCoder());
return ret;
}
- public PCollection<BeamSqlRow> sideInputJoin(
- PCollection<KV<BeamSqlRow, BeamSqlRow>> extractedLeftRows,
- PCollection<KV<BeamSqlRow, BeamSqlRow>> extractedRightRows,
- BeamSqlRow leftNullRow, BeamSqlRow rightNullRow) {
+ public PCollection<BeamRecord> sideInputJoin(
+ PCollection<KV<BeamRecord, BeamRecord>> extractedLeftRows,
+ PCollection<KV<BeamRecord, BeamRecord>> extractedRightRows,
+ BeamRecord leftNullRow, BeamRecord rightNullRow) {
// we always make the Unbounded table on the left to do the sideInput join
// (will convert the result accordingly before return)
boolean swapped = (extractedLeftRows.isBounded() == PCollection.IsBounded.BOUNDED);
JoinRelType realJoinType =
(swapped && joinType != JoinRelType.INNER) ? JoinRelType.LEFT : joinType;
- PCollection<KV<BeamSqlRow, BeamSqlRow>> realLeftRows =
+ PCollection<KV<BeamRecord, BeamRecord>> realLeftRows =
swapped ? extractedRightRows : extractedLeftRows;
- PCollection<KV<BeamSqlRow, BeamSqlRow>> realRightRows =
+ PCollection<KV<BeamRecord, BeamRecord>> realRightRows =
swapped ? extractedLeftRows : extractedRightRows;
- BeamSqlRow realRightNullRow = swapped ? leftNullRow : rightNullRow;
+ BeamRecord realRightNullRow = swapped ? leftNullRow : rightNullRow;
// swapped still need to pass down because, we need to swap the result back.
return sideInputJoinHelper(realJoinType, realLeftRows, realRightRows,
realRightNullRow, swapped);
}
- private PCollection<BeamSqlRow> sideInputJoinHelper(
+ private PCollection<BeamRecord> sideInputJoinHelper(
JoinRelType joinType,
- PCollection<KV<BeamSqlRow, BeamSqlRow>> leftRows,
- PCollection<KV<BeamSqlRow, BeamSqlRow>> rightRows,
- BeamSqlRow rightNullRow, boolean swapped) {
- final PCollectionView<Map<BeamSqlRow, Iterable<BeamSqlRow>>> rowsView = rightRows
- .apply(View.<BeamSqlRow, BeamSqlRow>asMultimap());
+ PCollection<KV<BeamRecord, BeamRecord>> leftRows,
+ PCollection<KV<BeamRecord, BeamRecord>> rightRows,
+ BeamRecord rightNullRow, boolean swapped) {
+ final PCollectionView<Map<BeamRecord, Iterable<BeamRecord>>> rowsView = rightRows
+ .apply(View.<BeamRecord, BeamRecord>asMultimap());
- PCollection<BeamSqlRow> ret = leftRows
+ PCollection<BeamRecord> ret = leftRows
.apply(ParDo.of(new BeamJoinTransforms.SideInputJoinDoFn(
joinType, rightNullRow, rowsView, swapped)).withSideInputs(rowsView))
- .setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRowType(getRowType())));
+ .setCoder(CalciteUtils.toBeamRowType(getRowType()).getRecordCoder());
return ret;
}
- private BeamSqlRow buildNullRow(BeamRelNode relNode) {
- BeamSqlRowType leftType = CalciteUtils.toBeamRowType(relNode.getRowType());
- BeamSqlRow nullRow = new BeamSqlRow(leftType);
+ private BeamRecord buildNullRow(BeamRelNode relNode) {
+ BeamSqlRecordType leftType = CalciteUtils.toBeamRowType(relNode.getRowType());
+ BeamRecord nullRow = new BeamRecord(leftType);
for (int i = 0; i < leftType.size(); i++) {
nullRow.addField(i, null);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRel.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRel.java
index 0075d3a..b55252a 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRel.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRel.java
@@ -20,7 +20,7 @@ package org.apache.beam.sdk.extensions.sql.impl.rel;
import java.util.List;
import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.values.BeamRecord;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.calcite.plan.RelOptCluster;
@@ -49,7 +49,7 @@ public class BeamMinusRel extends Minus implements BeamRelNode {
return new BeamMinusRel(getCluster(), traitSet, inputs, all);
}
- @Override public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections
+ @Override public PCollection<BeamRecord> buildBeamPipeline(PCollectionTuple inputPCollections
, BeamSqlEnv sqlEnv) throws Exception {
return delegate.buildBeamPipeline(inputPCollections, sqlEnv);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamProjectRel.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamProjectRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamProjectRel.java
index 6ccb156..b1ff629 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamProjectRel.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamProjectRel.java
@@ -23,9 +23,8 @@ import org.apache.beam.sdk.extensions.sql.impl.interpreter.BeamSqlExpressionExec
import org.apache.beam.sdk.extensions.sql.impl.interpreter.BeamSqlFnExecutor;
import org.apache.beam.sdk.extensions.sql.impl.transform.BeamSqlProjectFn;
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowCoder;
import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.BeamRecord;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.calcite.plan.RelOptCluster;
@@ -60,20 +59,20 @@ public class BeamProjectRel extends Project implements BeamRelNode {
}
@Override
- public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections
+ public PCollection<BeamRecord> buildBeamPipeline(PCollectionTuple inputPCollections
, BeamSqlEnv sqlEnv) throws Exception {
RelNode input = getInput();
String stageName = BeamSqlRelUtils.getStageName(this);
- PCollection<BeamSqlRow> upstream =
+ PCollection<BeamRecord> upstream =
BeamSqlRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections, sqlEnv);
BeamSqlExpressionExecutor executor = new BeamSqlFnExecutor(this);
- PCollection<BeamSqlRow> projectStream = upstream.apply(stageName, ParDo
+ PCollection<BeamRecord> projectStream = upstream.apply(stageName, ParDo
.of(new BeamSqlProjectFn(getRelTypeName(), executor,
CalciteUtils.toBeamRowType(rowType))));
- projectStream.setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRowType(getRowType())));
+ projectStream.setCoder(CalciteUtils.toBeamRowType(getRowType()).getRecordCoder());
return projectStream;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamRelNode.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamRelNode.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamRelNode.java
index 8a51cc7..b8b4293 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamRelNode.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamRelNode.java
@@ -18,7 +18,7 @@
package org.apache.beam.sdk.extensions.sql.impl.rel;
import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.values.BeamRecord;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.calcite.rel.RelNode;
@@ -33,6 +33,6 @@ public interface BeamRelNode extends RelNode {
* {@code BeamQueryPlanner} visits it with a DFS(Depth-First-Search)
* algorithm.
*/
- PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections, BeamSqlEnv sqlEnv)
+ PCollection<BeamRecord> buildBeamPipeline(PCollectionTuple inputPCollections, BeamSqlEnv sqlEnv)
throws Exception;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSetOperatorRelBase.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSetOperatorRelBase.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSetOperatorRelBase.java
index 44e4338..f9cbf4f 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSetOperatorRelBase.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSetOperatorRelBase.java
@@ -22,13 +22,13 @@ import java.io.Serializable;
import java.util.List;
import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
import org.apache.beam.sdk.extensions.sql.impl.transform.BeamSetOperatorsTransforms;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.join.CoGbkResult;
import org.apache.beam.sdk.transforms.join.CoGroupByKey;
import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.values.BeamRecord;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
@@ -62,11 +62,11 @@ public class BeamSetOperatorRelBase {
this.all = all;
}
- public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections
+ public PCollection<BeamRecord> buildBeamPipeline(PCollectionTuple inputPCollections
, BeamSqlEnv sqlEnv) throws Exception {
- PCollection<BeamSqlRow> leftRows = BeamSqlRelUtils.getBeamRelInput(inputs.get(0))
+ PCollection<BeamRecord> leftRows = BeamSqlRelUtils.getBeamRelInput(inputs.get(0))
.buildBeamPipeline(inputPCollections, sqlEnv);
- PCollection<BeamSqlRow> rightRows = BeamSqlRelUtils.getBeamRelInput(inputs.get(1))
+ PCollection<BeamRecord> rightRows = BeamSqlRelUtils.getBeamRelInput(inputs.get(1))
.buildBeamPipeline(inputPCollections, sqlEnv);
WindowFn leftWindow = leftRows.getWindowingStrategy().getWindowFn();
@@ -77,20 +77,20 @@ public class BeamSetOperatorRelBase {
+ leftWindow + " VS " + rightWindow);
}
- final TupleTag<BeamSqlRow> leftTag = new TupleTag<>();
- final TupleTag<BeamSqlRow> rightTag = new TupleTag<>();
+ final TupleTag<BeamRecord> leftTag = new TupleTag<>();
+ final TupleTag<BeamRecord> rightTag = new TupleTag<>();
// co-group
String stageName = BeamSqlRelUtils.getStageName(beamRelNode);
- PCollection<KV<BeamSqlRow, CoGbkResult>> coGbkResultCollection = KeyedPCollectionTuple
+ PCollection<KV<BeamRecord, CoGbkResult>> coGbkResultCollection = KeyedPCollectionTuple
.of(leftTag, leftRows.apply(
stageName + "_CreateLeftIndex", MapElements.via(
new BeamSetOperatorsTransforms.BeamSqlRow2KvFn())))
.and(rightTag, rightRows.apply(
stageName + "_CreateRightIndex", MapElements.via(
new BeamSetOperatorsTransforms.BeamSqlRow2KvFn())))
- .apply(CoGroupByKey.<BeamSqlRow>create());
- PCollection<BeamSqlRow> ret = coGbkResultCollection
+ .apply(CoGroupByKey.<BeamRecord>create());
+ PCollection<BeamRecord> ret = coGbkResultCollection
.apply(ParDo.of(new BeamSetOperatorsTransforms.SetOperatorFilteringDoFn(leftTag, rightTag,
opType, all)));
return ret;
http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRel.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRel.java
index 4ea12ca..0cbea5c 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRel.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRel.java
@@ -27,13 +27,13 @@ import java.util.List;
import org.apache.beam.sdk.coders.ListCoder;
import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowCoder;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordHelper;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Top;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.values.BeamRecord;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.calcite.plan.RelOptCluster;
@@ -120,10 +120,10 @@ public class BeamSortRel extends Sort implements BeamRelNode {
}
}
- @Override public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections
+ @Override public PCollection<BeamRecord> buildBeamPipeline(PCollectionTuple inputPCollections
, BeamSqlEnv sqlEnv) throws Exception {
RelNode input = getInput();
- PCollection<BeamSqlRow> upstream = BeamSqlRelUtils.getBeamRelInput(input)
+ PCollection<BeamRecord> upstream = BeamSqlRelUtils.getBeamRelInput(input)
.buildBeamPipeline(inputPCollections, sqlEnv);
Type windowType = upstream.getWindowingStrategy().getWindowFn()
.getWindowTypeDescriptor().getType();
@@ -135,21 +135,21 @@ public class BeamSortRel extends Sort implements BeamRelNode {
BeamSqlRowComparator comparator = new BeamSqlRowComparator(fieldIndices, orientation,
nullsFirst);
// first find the top (offset + count)
- PCollection<List<BeamSqlRow>> rawStream =
+ PCollection<List<BeamRecord>> rawStream =
upstream.apply("extractTopOffsetAndFetch",
Top.of(startIndex + count, comparator).withoutDefaults())
- .setCoder(ListCoder.<BeamSqlRow>of(upstream.getCoder()));
+ .setCoder(ListCoder.<BeamRecord>of(upstream.getCoder()));
// strip the `leading offset`
if (startIndex > 0) {
rawStream = rawStream.apply("stripLeadingOffset", ParDo.of(
- new SubListFn<BeamSqlRow>(startIndex, startIndex + count)))
- .setCoder(ListCoder.<BeamSqlRow>of(upstream.getCoder()));
+ new SubListFn<BeamRecord>(startIndex, startIndex + count)))
+ .setCoder(ListCoder.<BeamRecord>of(upstream.getCoder()));
}
- PCollection<BeamSqlRow> orderedStream = rawStream.apply(
- "flatten", Flatten.<BeamSqlRow>iterables());
- orderedStream.setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRowType(getRowType())));
+ PCollection<BeamRecord> orderedStream = rawStream.apply(
+ "flatten", Flatten.<BeamRecord>iterables());
+ orderedStream.setCoder(CalciteUtils.toBeamRowType(getRowType()).getRecordCoder());
return orderedStream;
}
@@ -174,7 +174,7 @@ public class BeamSortRel extends Sort implements BeamRelNode {
return new BeamSortRel(getCluster(), traitSet, newInput, newCollation, offset, fetch);
}
- private static class BeamSqlRowComparator implements Comparator<BeamSqlRow>, Serializable {
+ private static class BeamSqlRowComparator implements Comparator<BeamRecord>, Serializable {
private List<Integer> fieldsIndices;
private List<Boolean> orientation;
private List<Boolean> nullsFirst;
@@ -187,11 +187,12 @@ public class BeamSortRel extends Sort implements BeamRelNode {
this.nullsFirst = nullsFirst;
}
- @Override public int compare(BeamSqlRow row1, BeamSqlRow row2) {
+ @Override public int compare(BeamRecord row1, BeamRecord row2) {
for (int i = 0; i < fieldsIndices.size(); i++) {
int fieldIndex = fieldsIndices.get(i);
int fieldRet = 0;
- SqlTypeName fieldType = CalciteUtils.getFieldType(row1.getDataType(), fieldIndex);
+ SqlTypeName fieldType = CalciteUtils.getFieldType(
+ BeamSqlRecordHelper.getSqlRecordType(row1), fieldIndex);
// whether NULL should be ordered first or last(compared to non-null values) depends on
// what user specified in SQL(NULLS FIRST/NULLS LAST)
if (row1.isNull(fieldIndex) && row2.isNull(fieldIndex)) {
@@ -203,28 +204,16 @@ public class BeamSortRel extends Sort implements BeamRelNode {
} else {
switch (fieldType) {
case TINYINT:
- fieldRet = numberCompare(row1.getByte(fieldIndex), row2.getByte(fieldIndex));
- break;
case SMALLINT:
- fieldRet = numberCompare(row1.getShort(fieldIndex), row2.getShort(fieldIndex));
- break;
case INTEGER:
- fieldRet = numberCompare(row1.getInteger(fieldIndex), row2.getInteger(fieldIndex));
- break;
case BIGINT:
- fieldRet = numberCompare(row1.getLong(fieldIndex), row2.getLong(fieldIndex));
- break;
case FLOAT:
- fieldRet = numberCompare(row1.getFloat(fieldIndex), row2.getFloat(fieldIndex));
- break;
case DOUBLE:
- fieldRet = numberCompare(row1.getDouble(fieldIndex), row2.getDouble(fieldIndex));
- break;
case VARCHAR:
- fieldRet = row1.getString(fieldIndex).compareTo(row2.getString(fieldIndex));
- break;
case DATE:
- fieldRet = row1.getDate(fieldIndex).compareTo(row2.getDate(fieldIndex));
+ Comparable v1 = (Comparable) row1.getFieldValue(fieldIndex);
+ Comparable v2 = (Comparable) row2.getFieldValue(fieldIndex);
+ fieldRet = v1.compareTo(v2);
break;
default:
throw new UnsupportedOperationException(
@@ -241,7 +230,7 @@ public class BeamSortRel extends Sort implements BeamRelNode {
}
}
- public static <T extends Number & Comparable> int numberCompare(T a, T b) {
+ public static <T extends Comparable> int compare(T a, T b) {
return a.compareTo(b);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnionRel.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnionRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnionRel.java
index d35fa67..63ebdf3 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnionRel.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnionRel.java
@@ -20,8 +20,8 @@ package org.apache.beam.sdk.extensions.sql.impl.rel;
import java.util.List;
import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.values.BeamRecord;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.calcite.plan.RelOptCluster;
@@ -81,7 +81,7 @@ public class BeamUnionRel extends Union implements BeamRelNode {
return new BeamUnionRel(getCluster(), traitSet, inputs, all);
}
- @Override public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections
+ @Override public PCollection<BeamRecord> buildBeamPipeline(PCollectionTuple inputPCollections
, BeamSqlEnv sqlEnv) throws Exception {
return delegate.buildBeamPipeline(inputPCollections, sqlEnv);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRel.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRel.java
index f12cbbc..8ad6e8d 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRel.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRel.java
@@ -23,11 +23,10 @@ import java.util.ArrayList;
import java.util.List;
import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowCoder;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType;
import org.apache.beam.sdk.extensions.sql.schema.BeamTableUtils;
import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.BeamRecord;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.calcite.plan.RelOptCluster;
@@ -56,17 +55,17 @@ public class BeamValuesRel extends Values implements BeamRelNode {
}
- @Override public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections
+ @Override public PCollection<BeamRecord> buildBeamPipeline(PCollectionTuple inputPCollections
, BeamSqlEnv sqlEnv) throws Exception {
- List<BeamSqlRow> rows = new ArrayList<>(tuples.size());
+ List<BeamRecord> rows = new ArrayList<>(tuples.size());
String stageName = BeamSqlRelUtils.getStageName(this);
if (tuples.isEmpty()) {
throw new IllegalStateException("Values with empty tuples!");
}
- BeamSqlRowType beamSQLRowType = CalciteUtils.toBeamRowType(this.getRowType());
+ BeamSqlRecordType beamSQLRowType = CalciteUtils.toBeamRowType(this.getRowType());
for (ImmutableList<RexLiteral> tuple : tuples) {
- BeamSqlRow row = new BeamSqlRow(beamSQLRowType);
+ BeamRecord row = new BeamRecord(beamSQLRowType);
for (int i = 0; i < tuple.size(); i++) {
BeamTableUtils.addFieldWithAutoTypeCasting(row, i, tuple.get(i).getValue());
}
@@ -74,6 +73,6 @@ public class BeamValuesRel extends Values implements BeamRelNode {
}
return inputPCollections.getPipeline().apply(stageName, Create.of(rows))
- .setCoder(new BeamSqlRowCoder(beamSQLRowType));
+ .setCoder(beamSQLRowType.getRecordCoder());
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamAggregationTransforms.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamAggregationTransforms.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamAggregationTransforms.java
index 095875f..dab79a2 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamAggregationTransforms.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamAggregationTransforms.java
@@ -35,13 +35,14 @@ import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlInputRefExpression;
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordHelper;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType;
import org.apache.beam.sdk.extensions.sql.schema.BeamSqlUdaf;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.BeamRecord;
import org.apache.beam.sdk.values.KV;
import org.apache.calcite.rel.core.AggregateCall;
import org.apache.calcite.schema.impl.AggregateFunctionImpl;
@@ -56,12 +57,12 @@ public class BeamAggregationTransforms implements Serializable{
/**
* Merge KV to single record.
*/
- public static class MergeAggregationRecord extends DoFn<KV<BeamSqlRow, BeamSqlRow>, BeamSqlRow> {
- private BeamSqlRowType outRowType;
+ public static class MergeAggregationRecord extends DoFn<KV<BeamRecord, BeamRecord>, BeamRecord> {
+ private BeamSqlRecordType outRowType;
private List<String> aggFieldNames;
private int windowStartFieldIdx;
- public MergeAggregationRecord(BeamSqlRowType outRowType, List<AggregateCall> aggList
+ public MergeAggregationRecord(BeamSqlRecordType outRowType, List<AggregateCall> aggList
, int windowStartFieldIdx) {
this.outRowType = outRowType;
this.aggFieldNames = new ArrayList<>();
@@ -73,10 +74,10 @@ public class BeamAggregationTransforms implements Serializable{
@ProcessElement
public void processElement(ProcessContext c, BoundedWindow window) {
- BeamSqlRow outRecord = new BeamSqlRow(outRowType);
+ BeamRecord outRecord = new BeamRecord(outRowType);
outRecord.updateWindowRange(c.element().getKey(), window);
- KV<BeamSqlRow, BeamSqlRow> kvRecord = c.element();
+ KV<BeamRecord, BeamRecord> kvRecord = c.element();
for (String f : kvRecord.getKey().getDataType().getFieldsName()) {
outRecord.addField(f, kvRecord.getKey().getFieldValue(f));
}
@@ -95,7 +96,7 @@ public class BeamAggregationTransforms implements Serializable{
* extract group-by fields.
*/
public static class AggregationGroupByKeyFn
- implements SerializableFunction<BeamSqlRow, BeamSqlRow> {
+ implements SerializableFunction<BeamRecord, BeamRecord> {
private List<Integer> groupByKeys;
public AggregationGroupByKeyFn(int windowFieldIdx, ImmutableBitSet groupSet) {
@@ -108,9 +109,9 @@ public class BeamAggregationTransforms implements Serializable{
}
@Override
- public BeamSqlRow apply(BeamSqlRow input) {
- BeamSqlRowType typeOfKey = exTypeOfKeyRecord(input.getDataType());
- BeamSqlRow keyOfRecord = new BeamSqlRow(typeOfKey);
+ public BeamRecord apply(BeamRecord input) {
+ BeamSqlRecordType typeOfKey = exTypeOfKeyRecord(BeamSqlRecordHelper.getSqlRecordType(input));
+ BeamRecord keyOfRecord = new BeamRecord(typeOfKey);
keyOfRecord.updateWindowRange(input, null);
for (int idx = 0; idx < groupByKeys.size(); ++idx) {
@@ -119,21 +120,21 @@ public class BeamAggregationTransforms implements Serializable{
return keyOfRecord;
}
- private BeamSqlRowType exTypeOfKeyRecord(BeamSqlRowType dataType) {
+ private BeamSqlRecordType exTypeOfKeyRecord(BeamSqlRecordType dataType) {
List<String> fieldNames = new ArrayList<>();
List<Integer> fieldTypes = new ArrayList<>();
for (int idx : groupByKeys) {
fieldNames.add(dataType.getFieldsName().get(idx));
fieldTypes.add(dataType.getFieldsType().get(idx));
}
- return BeamSqlRowType.create(fieldNames, fieldTypes);
+ return BeamSqlRecordType.create(fieldNames, fieldTypes);
}
}
/**
* Assign event timestamp.
*/
- public static class WindowTimestampFn implements SerializableFunction<BeamSqlRow, Instant> {
+ public static class WindowTimestampFn implements SerializableFunction<BeamRecord, Instant> {
private int windowFieldIdx = -1;
public WindowTimestampFn(int windowFieldIdx) {
@@ -142,7 +143,7 @@ public class BeamAggregationTransforms implements Serializable{
}
@Override
- public Instant apply(BeamSqlRow input) {
+ public Instant apply(BeamRecord input) {
return new Instant(input.getDate(windowFieldIdx).getTime());
}
}
@@ -151,13 +152,13 @@ public class BeamAggregationTransforms implements Serializable{
* An adaptor class to invoke Calcite UDAF instances in Beam {@code CombineFn}.
*/
public static class AggregationAdaptor
- extends CombineFn<BeamSqlRow, AggregationAccumulator, BeamSqlRow> {
+ extends CombineFn<BeamRecord, AggregationAccumulator, BeamRecord> {
private List<BeamSqlUdaf> aggregators;
private List<BeamSqlExpression> sourceFieldExps;
- private BeamSqlRowType finalRowType;
+ private BeamSqlRecordType finalRowType;
public AggregationAdaptor(List<AggregateCall> aggregationCalls,
- BeamSqlRowType sourceRowType) {
+ BeamSqlRecordType sourceRowType) {
aggregators = new ArrayList<>();
sourceFieldExps = new ArrayList<>();
List<String> outFieldsName = new ArrayList<>();
@@ -206,7 +207,7 @@ public class BeamAggregationTransforms implements Serializable{
break;
}
}
- finalRowType = BeamSqlRowType.create(outFieldsName, outFieldsType);
+ finalRowType = BeamSqlRecordType.create(outFieldsName, outFieldsType);
}
@Override
public AggregationAccumulator createAccumulator() {
@@ -217,7 +218,7 @@ public class BeamAggregationTransforms implements Serializable{
return initialAccu;
}
@Override
- public AggregationAccumulator addInput(AggregationAccumulator accumulator, BeamSqlRow input) {
+ public AggregationAccumulator addInput(AggregationAccumulator accumulator, BeamRecord input) {
AggregationAccumulator deltaAcc = new AggregationAccumulator();
for (int idx = 0; idx < aggregators.size(); ++idx) {
deltaAcc.accumulatorElements.add(
@@ -240,8 +241,8 @@ public class BeamAggregationTransforms implements Serializable{
return deltaAcc;
}
@Override
- public BeamSqlRow extractOutput(AggregationAccumulator accumulator) {
- BeamSqlRow result = new BeamSqlRow(finalRowType);
+ public BeamRecord extractOutput(AggregationAccumulator accumulator) {
+ BeamRecord result = new BeamRecord(finalRowType);
for (int idx = 0; idx < aggregators.size(); ++idx) {
result.addField(idx, aggregators.get(idx).result(accumulator.accumulatorElements.get(idx)));
}
@@ -249,7 +250,7 @@ public class BeamAggregationTransforms implements Serializable{
}
@Override
public Coder<AggregationAccumulator> getAccumulatorCoder(
- CoderRegistry registry, Coder<BeamSqlRow> inputCoder)
+ CoderRegistry registry, Coder<BeamRecord> inputCoder)
throws CannotProvideCoderException {
registry.registerCoderForClass(BigDecimal.class, BigDecimalCoder.of());
List<Coder> aggAccuCoderList = new ArrayList<>();
http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamJoinTransforms.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamJoinTransforms.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamJoinTransforms.java
index e0898d1..105bbf3 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamJoinTransforms.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamJoinTransforms.java
@@ -22,10 +22,11 @@ import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordHelper;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.values.BeamRecord;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.calcite.rel.core.JoinRelType;
@@ -40,7 +41,7 @@ public class BeamJoinTransforms {
* A {@code SimpleFunction} to extract join fields from the specified row.
*/
public static class ExtractJoinFields
- extends SimpleFunction<BeamSqlRow, KV<BeamSqlRow, BeamSqlRow>> {
+ extends SimpleFunction<BeamRecord, KV<BeamRecord, BeamRecord>> {
private final boolean isLeft;
private final List<Pair<Integer, Integer>> joinColumns;
@@ -49,7 +50,7 @@ public class BeamJoinTransforms {
this.joinColumns = joinColumns;
}
- @Override public KV<BeamSqlRow, BeamSqlRow> apply(BeamSqlRow input) {
+ @Override public KV<BeamRecord, BeamRecord> apply(BeamRecord input) {
// build the type
// the name of the join field is not important
List<String> names = new ArrayList<>(joinColumns.size());
@@ -57,13 +58,15 @@ public class BeamJoinTransforms {
for (int i = 0; i < joinColumns.size(); i++) {
names.add("c" + i);
types.add(isLeft
- ? input.getDataType().getFieldsType().get(joinColumns.get(i).getKey()) :
- input.getDataType().getFieldsType().get(joinColumns.get(i).getValue()));
+ ? BeamSqlRecordHelper.getSqlRecordType(input).getFieldsType()
+ .get(joinColumns.get(i).getKey())
+ : BeamSqlRecordHelper.getSqlRecordType(input).getFieldsType()
+ .get(joinColumns.get(i).getValue()));
}
- BeamSqlRowType type = BeamSqlRowType.create(names, types);
+ BeamSqlRecordType type = BeamSqlRecordType.create(names, types);
// build the row
- BeamSqlRow row = new BeamSqlRow(type);
+ BeamRecord row = new BeamRecord(type);
for (int i = 0; i < joinColumns.size(); i++) {
row.addField(i, input
.getFieldValue(isLeft ? joinColumns.get(i).getKey() : joinColumns.get(i).getValue()));
@@ -76,14 +79,14 @@ public class BeamJoinTransforms {
/**
* A {@code DoFn} which implement the sideInput-JOIN.
*/
- public static class SideInputJoinDoFn extends DoFn<KV<BeamSqlRow, BeamSqlRow>, BeamSqlRow> {
- private final PCollectionView<Map<BeamSqlRow, Iterable<BeamSqlRow>>> sideInputView;
+ public static class SideInputJoinDoFn extends DoFn<KV<BeamRecord, BeamRecord>, BeamRecord> {
+ private final PCollectionView<Map<BeamRecord, Iterable<BeamRecord>>> sideInputView;
private final JoinRelType joinType;
- private final BeamSqlRow rightNullRow;
+ private final BeamRecord rightNullRow;
private final boolean swap;
- public SideInputJoinDoFn(JoinRelType joinType, BeamSqlRow rightNullRow,
- PCollectionView<Map<BeamSqlRow, Iterable<BeamSqlRow>>> sideInputView,
+ public SideInputJoinDoFn(JoinRelType joinType, BeamRecord rightNullRow,
+ PCollectionView<Map<BeamRecord, Iterable<BeamRecord>>> sideInputView,
boolean swap) {
this.joinType = joinType;
this.rightNullRow = rightNullRow;
@@ -92,13 +95,13 @@ public class BeamJoinTransforms {
}
@ProcessElement public void processElement(ProcessContext context) {
- BeamSqlRow key = context.element().getKey();
- BeamSqlRow leftRow = context.element().getValue();
- Map<BeamSqlRow, Iterable<BeamSqlRow>> key2Rows = context.sideInput(sideInputView);
- Iterable<BeamSqlRow> rightRowsIterable = key2Rows.get(key);
+ BeamRecord key = context.element().getKey();
+ BeamRecord leftRow = context.element().getValue();
+ Map<BeamRecord, Iterable<BeamRecord>> key2Rows = context.sideInput(sideInputView);
+ Iterable<BeamRecord> rightRowsIterable = key2Rows.get(key);
if (rightRowsIterable != null && rightRowsIterable.iterator().hasNext()) {
- Iterator<BeamSqlRow> it = rightRowsIterable.iterator();
+ Iterator<BeamRecord> it = rightRowsIterable.iterator();
while (it.hasNext()) {
context.output(combineTwoRowsIntoOne(leftRow, it.next(), swap));
}
@@ -115,11 +118,11 @@ public class BeamJoinTransforms {
* A {@code SimpleFunction} to combine two rows into one.
*/
public static class JoinParts2WholeRow
- extends SimpleFunction<KV<BeamSqlRow, KV<BeamSqlRow, BeamSqlRow>>, BeamSqlRow> {
- @Override public BeamSqlRow apply(KV<BeamSqlRow, KV<BeamSqlRow, BeamSqlRow>> input) {
- KV<BeamSqlRow, BeamSqlRow> parts = input.getValue();
- BeamSqlRow leftRow = parts.getKey();
- BeamSqlRow rightRow = parts.getValue();
+ extends SimpleFunction<KV<BeamRecord, KV<BeamRecord, BeamRecord>>, BeamRecord> {
+ @Override public BeamRecord apply(KV<BeamRecord, KV<BeamRecord, BeamRecord>> input) {
+ KV<BeamRecord, BeamRecord> parts = input.getValue();
+ BeamRecord leftRow = parts.getKey();
+ BeamRecord rightRow = parts.getValue();
return combineTwoRowsIntoOne(leftRow, rightRow, false);
}
}
@@ -127,8 +130,8 @@ public class BeamJoinTransforms {
/**
* As the method name suggests: combine two rows into one wide row.
*/
- private static BeamSqlRow combineTwoRowsIntoOne(BeamSqlRow leftRow,
- BeamSqlRow rightRow, boolean swap) {
+ private static BeamRecord combineTwoRowsIntoOne(BeamRecord leftRow,
+ BeamRecord rightRow, boolean swap) {
if (swap) {
return combineTwoRowsIntoOneHelper(rightRow, leftRow);
} else {
@@ -139,19 +142,19 @@ public class BeamJoinTransforms {
/**
* As the method name suggests: combine two rows into one wide row.
*/
- private static BeamSqlRow combineTwoRowsIntoOneHelper(BeamSqlRow leftRow,
- BeamSqlRow rightRow) {
+ private static BeamRecord combineTwoRowsIntoOneHelper(BeamRecord leftRow,
+ BeamRecord rightRow) {
// build the type
List<String> names = new ArrayList<>(leftRow.size() + rightRow.size());
names.addAll(leftRow.getDataType().getFieldsName());
names.addAll(rightRow.getDataType().getFieldsName());
List<Integer> types = new ArrayList<>(leftRow.size() + rightRow.size());
- types.addAll(leftRow.getDataType().getFieldsType());
- types.addAll(rightRow.getDataType().getFieldsType());
- BeamSqlRowType type = BeamSqlRowType.create(names, types);
+ types.addAll(BeamSqlRecordHelper.getSqlRecordType(leftRow).getFieldsType());
+ types.addAll(BeamSqlRecordHelper.getSqlRecordType(rightRow).getFieldsType());
+ BeamSqlRecordType type = BeamSqlRecordType.create(names, types);
- BeamSqlRow row = new BeamSqlRow(type);
+ BeamRecord row = new BeamRecord(type);
// build the row
for (int i = 0; i < leftRow.size(); i++) {
row.addField(i, leftRow.getFieldValue(i));
http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSetOperatorsTransforms.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSetOperatorsTransforms.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSetOperatorsTransforms.java
index 326b328..33ac807 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSetOperatorsTransforms.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSetOperatorsTransforms.java
@@ -20,10 +20,10 @@ package org.apache.beam.sdk.extensions.sql.impl.transform;
import java.util.Iterator;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSetOperatorRelBase;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.transforms.join.CoGbkResult;
+import org.apache.beam.sdk.values.BeamRecord;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TupleTag;
@@ -35,8 +35,8 @@ public abstract class BeamSetOperatorsTransforms {
* Transform a {@code BeamSqlRow} to a {@code KV<BeamSqlRow, BeamSqlRow>}.
*/
public static class BeamSqlRow2KvFn extends
- SimpleFunction<BeamSqlRow, KV<BeamSqlRow, BeamSqlRow>> {
- @Override public KV<BeamSqlRow, BeamSqlRow> apply(BeamSqlRow input) {
+ SimpleFunction<BeamRecord, KV<BeamRecord, BeamRecord>> {
+ @Override public KV<BeamRecord, BeamRecord> apply(BeamRecord input) {
return KV.of(input, input);
}
}
@@ -45,14 +45,14 @@ public abstract class BeamSetOperatorsTransforms {
* Filter function used for Set operators.
*/
public static class SetOperatorFilteringDoFn extends
- DoFn<KV<BeamSqlRow, CoGbkResult>, BeamSqlRow> {
- private TupleTag<BeamSqlRow> leftTag;
- private TupleTag<BeamSqlRow> rightTag;
+ DoFn<KV<BeamRecord, CoGbkResult>, BeamRecord> {
+ private TupleTag<BeamRecord> leftTag;
+ private TupleTag<BeamRecord> rightTag;
private BeamSetOperatorRelBase.OpType opType;
// ALL?
private boolean all;
- public SetOperatorFilteringDoFn(TupleTag<BeamSqlRow> leftTag, TupleTag<BeamSqlRow> rightTag,
+ public SetOperatorFilteringDoFn(TupleTag<BeamRecord> leftTag, TupleTag<BeamRecord> rightTag,
BeamSetOperatorRelBase.OpType opType, boolean all) {
this.leftTag = leftTag;
this.rightTag = rightTag;
@@ -62,13 +62,13 @@ public abstract class BeamSetOperatorsTransforms {
@ProcessElement public void processElement(ProcessContext ctx) {
CoGbkResult coGbkResult = ctx.element().getValue();
- Iterable<BeamSqlRow> leftRows = coGbkResult.getAll(leftTag);
- Iterable<BeamSqlRow> rightRows = coGbkResult.getAll(rightTag);
+ Iterable<BeamRecord> leftRows = coGbkResult.getAll(leftTag);
+ Iterable<BeamRecord> rightRows = coGbkResult.getAll(rightTag);
switch (opType) {
case UNION:
if (all) {
// output both left & right
- Iterator<BeamSqlRow> iter = leftRows.iterator();
+ Iterator<BeamRecord> iter = leftRows.iterator();
while (iter.hasNext()) {
ctx.output(iter.next());
}
@@ -84,7 +84,7 @@ public abstract class BeamSetOperatorsTransforms {
case INTERSECT:
if (leftRows.iterator().hasNext() && rightRows.iterator().hasNext()) {
if (all) {
- for (BeamSqlRow leftRow : leftRows) {
+ for (BeamRecord leftRow : leftRows) {
ctx.output(leftRow);
}
} else {
@@ -94,7 +94,7 @@ public abstract class BeamSetOperatorsTransforms {
break;
case MINUS:
if (leftRows.iterator().hasNext() && !rightRows.iterator().hasNext()) {
- Iterator<BeamSqlRow> iter = leftRows.iterator();
+ Iterator<BeamRecord> iter = leftRows.iterator();
if (all) {
// output all
while (iter.hasNext()) {
http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlFilterFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlFilterFn.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlFilterFn.java
index 855de7a..31efeb7 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlFilterFn.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlFilterFn.java
@@ -20,14 +20,14 @@ package org.apache.beam.sdk.extensions.sql.impl.transform;
import java.util.List;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.BeamSqlExpressionExecutor;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamFilterRel;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.values.BeamRecord;
/**
* {@code BeamSqlFilterFn} is the executor for a {@link BeamFilterRel} step.
*
*/
-public class BeamSqlFilterFn extends DoFn<BeamSqlRow, BeamSqlRow> {
+public class BeamSqlFilterFn extends DoFn<BeamRecord, BeamRecord> {
private String stepName;
private BeamSqlExpressionExecutor executor;
@@ -45,7 +45,7 @@ public class BeamSqlFilterFn extends DoFn<BeamSqlRow, BeamSqlRow> {
@ProcessElement
public void processElement(ProcessContext c) {
- BeamSqlRow in = c.element();
+ BeamRecord in = c.element();
List<Object> result = executor.execute(in);
http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlOutputToConsoleFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlOutputToConsoleFn.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlOutputToConsoleFn.java
index b40cfa6..f97a90a 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlOutputToConsoleFn.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlOutputToConsoleFn.java
@@ -17,14 +17,14 @@
*/
package org.apache.beam.sdk.extensions.sql.impl.transform;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.values.BeamRecord;
/**
* A test PTransform to display output in console.
*
*/
-public class BeamSqlOutputToConsoleFn extends DoFn<BeamSqlRow, Void> {
+public class BeamSqlOutputToConsoleFn extends DoFn<BeamRecord, Void> {
private String stepName;
http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlProjectFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlProjectFn.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlProjectFn.java
index b3f7ce5..a95c743 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlProjectFn.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlProjectFn.java
@@ -20,24 +20,24 @@ package org.apache.beam.sdk.extensions.sql.impl.transform;
import java.util.List;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.BeamSqlExpressionExecutor;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamProjectRel;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType;
import org.apache.beam.sdk.extensions.sql.schema.BeamTableUtils;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.BeamRecord;
/**
*
* {@code BeamSqlProjectFn} is the executor for a {@link BeamProjectRel} step.
*
*/
-public class BeamSqlProjectFn extends DoFn<BeamSqlRow, BeamSqlRow> {
+public class BeamSqlProjectFn extends DoFn<BeamRecord, BeamRecord> {
private String stepName;
private BeamSqlExpressionExecutor executor;
- private BeamSqlRowType outputRowType;
+ private BeamSqlRecordType outputRowType;
public BeamSqlProjectFn(String stepName, BeamSqlExpressionExecutor executor,
- BeamSqlRowType outputRowType) {
+ BeamSqlRecordType outputRowType) {
super();
this.stepName = stepName;
this.executor = executor;
@@ -51,10 +51,10 @@ public class BeamSqlProjectFn extends DoFn<BeamSqlRow, BeamSqlRow> {
@ProcessElement
public void processElement(ProcessContext c, BoundedWindow window) {
- BeamSqlRow inputRow = c.element();
+ BeamRecord inputRow = c.element();
List<Object> results = executor.execute(inputRow);
- BeamSqlRow outRow = new BeamSqlRow(outputRowType);
+ BeamRecord outRow = new BeamRecord(outputRowType);
outRow.updateWindowRange(inputRow, window);
for (int idx = 0; idx < results.size(); ++idx) {
http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java
index b80e045..bf96e85 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java
@@ -23,7 +23,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rel.type.RelDataTypeField;
@@ -78,27 +78,27 @@ public class CalciteUtils {
/**
* Get the {@code SqlTypeName} for the specified column of a table.
*/
- public static SqlTypeName getFieldType(BeamSqlRowType schema, int index) {
+ public static SqlTypeName getFieldType(BeamSqlRecordType schema, int index) {
return toCalciteType(schema.getFieldsType().get(index));
}
/**
* Generate {@code BeamSqlRowType} from {@code RelDataType} which is used to create table.
*/
- public static BeamSqlRowType toBeamRowType(RelDataType tableInfo) {
+ public static BeamSqlRecordType toBeamRowType(RelDataType tableInfo) {
List<String> fieldNames = new ArrayList<>();
List<Integer> fieldTypes = new ArrayList<>();
for (RelDataTypeField f : tableInfo.getFieldList()) {
fieldNames.add(f.getName());
fieldTypes.add(toJavaType(f.getType().getSqlTypeName()));
}
- return BeamSqlRowType.create(fieldNames, fieldTypes);
+ return BeamSqlRecordType.create(fieldNames, fieldTypes);
}
/**
* Create an instance of {@code RelDataType} so it can be used to create a table.
*/
- public static RelProtoDataType toCalciteRowType(final BeamSqlRowType that) {
+ public static RelProtoDataType toCalciteRowType(final BeamSqlRecordType that) {
return new RelProtoDataType() {
@Override
public RelDataType apply(RelDataTypeFactory a) {
http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BaseBeamTable.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BaseBeamTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BaseBeamTable.java
index bf41c95..68b120e 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BaseBeamTable.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BaseBeamTable.java
@@ -23,12 +23,12 @@ import java.io.Serializable;
* Each IO in Beam has one table schema, by extending {@link BaseBeamTable}.
*/
public abstract class BaseBeamTable implements BeamSqlTable, Serializable {
- protected BeamSqlRowType beamSqlRowType;
- public BaseBeamTable(BeamSqlRowType beamSqlRowType) {
+ protected BeamSqlRecordType beamSqlRowType;
+ public BaseBeamTable(BeamSqlRecordType beamSqlRowType) {
this.beamSqlRowType = beamSqlRowType;
}
- @Override public BeamSqlRowType getRowType() {
+ @Override public BeamSqlRecordType getRowType() {
return beamSqlRowType;
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamPCollectionTable.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamPCollectionTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamPCollectionTable.java
index 5bbb8fd..68905b5 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamPCollectionTable.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamPCollectionTable.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.extensions.sql.schema;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.BeamRecord;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollection.IsBounded;
import org.apache.beam.sdk.values.PDone;
@@ -29,14 +30,14 @@ import org.apache.beam.sdk.values.PDone;
*/
public class BeamPCollectionTable extends BaseBeamTable {
private BeamIOType ioType;
- private transient PCollection<BeamSqlRow> upstream;
+ private transient PCollection<BeamRecord> upstream;
- protected BeamPCollectionTable(BeamSqlRowType beamSqlRowType) {
+ protected BeamPCollectionTable(BeamSqlRecordType beamSqlRowType) {
super(beamSqlRowType);
}
- public BeamPCollectionTable(PCollection<BeamSqlRow> upstream,
- BeamSqlRowType beamSqlRowType){
+ public BeamPCollectionTable(PCollection<BeamRecord> upstream,
+ BeamSqlRecordType beamSqlRowType){
this(beamSqlRowType);
ioType = upstream.isBounded().equals(IsBounded.BOUNDED)
? BeamIOType.BOUNDED : BeamIOType.UNBOUNDED;
@@ -49,12 +50,12 @@ public class BeamPCollectionTable extends BaseBeamTable {
}
@Override
- public PCollection<BeamSqlRow> buildIOReader(Pipeline pipeline) {
+ public PCollection<BeamRecord> buildIOReader(Pipeline pipeline) {
return upstream;
}
@Override
- public PTransform<? super PCollection<BeamSqlRow>, PDone> buildIOWriter() {
+ public PTransform<? super PCollection<BeamRecord>, PDone> buildIOWriter() {
throw new IllegalArgumentException("cannot use [BeamPCollectionTable] as target");
}