You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2017/08/04 17:09:49 UTC
[6/7] beam git commit: refactor BeamRecord, BeamRecordType,
BeamSqlRecordType, BeamRecordCoder
refactor BeamRecord, BeamRecordType, BeamSqlRecordType, BeamRecordCoder
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/89109b8c
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/89109b8c
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/89109b8c
Branch: refs/heads/DSL_SQL
Commit: 89109b8cdc667c4e07529e9748ed4290e88b9282
Parents: 129ae96
Author: mingmxu <mi...@ebay.com>
Authored: Thu Aug 3 12:11:06 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Fri Aug 4 10:08:37 2017 -0700
----------------------------------------------------------------------
.../apache/beam/sdk/coders/BeamRecordCoder.java | 20 +-
.../org/apache/beam/sdk/values/BeamRecord.java | 22 +-
.../apache/beam/sdk/values/BeamRecordType.java | 70 ++++++
.../beam/sdk/values/BeamRecordTypeProvider.java | 59 -----
.../apache/beam/sdk/extensions/sql/BeamSql.java | 22 +-
.../beam/sdk/extensions/sql/BeamSqlCli.java | 8 +-
.../beam/sdk/extensions/sql/BeamSqlEnv.java | 6 +-
.../extensions/sql/example/BeamSqlExample.java | 27 ++-
.../interpreter/BeamSqlExpressionExecutor.java | 6 +-
.../sql/impl/interpreter/BeamSqlFnExecutor.java | 6 +-
.../operator/BeamSqlCaseExpression.java | 4 +-
.../operator/BeamSqlCastExpression.java | 4 +-
.../interpreter/operator/BeamSqlExpression.java | 8 +-
.../operator/BeamSqlInputRefExpression.java | 4 +-
.../interpreter/operator/BeamSqlPrimitive.java | 6 +-
.../operator/BeamSqlReinterpretExpression.java | 4 +-
.../operator/BeamSqlUdfExpression.java | 4 +-
.../operator/BeamSqlWindowEndExpression.java | 4 +-
.../operator/BeamSqlWindowExpression.java | 4 +-
.../operator/BeamSqlWindowStartExpression.java | 4 +-
.../arithmetic/BeamSqlArithmeticExpression.java | 4 +-
.../comparison/BeamSqlCompareExpression.java | 4 +-
.../comparison/BeamSqlIsNotNullExpression.java | 4 +-
.../comparison/BeamSqlIsNullExpression.java | 4 +-
.../date/BeamSqlCurrentDateExpression.java | 4 +-
.../date/BeamSqlCurrentTimeExpression.java | 4 +-
.../date/BeamSqlCurrentTimestampExpression.java | 4 +-
.../date/BeamSqlDateCeilExpression.java | 4 +-
.../date/BeamSqlDateFloorExpression.java | 4 +-
.../operator/date/BeamSqlExtractExpression.java | 4 +-
.../operator/logical/BeamSqlAndExpression.java | 4 +-
.../operator/logical/BeamSqlNotExpression.java | 4 +-
.../operator/logical/BeamSqlOrExpression.java | 4 +-
.../math/BeamSqlMathBinaryExpression.java | 4 +-
.../math/BeamSqlMathUnaryExpression.java | 4 +-
.../operator/math/BeamSqlPiExpression.java | 4 +-
.../operator/math/BeamSqlRandExpression.java | 4 +-
.../math/BeamSqlRandIntegerExpression.java | 4 +-
.../string/BeamSqlCharLengthExpression.java | 4 +-
.../string/BeamSqlConcatExpression.java | 4 +-
.../string/BeamSqlInitCapExpression.java | 4 +-
.../operator/string/BeamSqlLowerExpression.java | 4 +-
.../string/BeamSqlOverlayExpression.java | 4 +-
.../string/BeamSqlPositionExpression.java | 4 +-
.../string/BeamSqlSubstringExpression.java | 4 +-
.../operator/string/BeamSqlTrimExpression.java | 4 +-
.../operator/string/BeamSqlUpperExpression.java | 4 +-
.../sql/impl/planner/BeamQueryPlanner.java | 4 +-
.../sql/impl/rel/BeamAggregationRel.java | 38 ++--
.../extensions/sql/impl/rel/BeamFilterRel.java | 11 +-
.../extensions/sql/impl/rel/BeamIOSinkRel.java | 6 +-
.../sql/impl/rel/BeamIOSourceRel.java | 13 +-
.../sql/impl/rel/BeamIntersectRel.java | 4 +-
.../extensions/sql/impl/rel/BeamJoinRel.java | 75 ++++---
.../extensions/sql/impl/rel/BeamMinusRel.java | 4 +-
.../extensions/sql/impl/rel/BeamProjectRel.java | 11 +-
.../extensions/sql/impl/rel/BeamRelNode.java | 4 +-
.../sql/impl/rel/BeamSetOperatorRelBase.java | 18 +-
.../extensions/sql/impl/rel/BeamSortRel.java | 49 ++---
.../extensions/sql/impl/rel/BeamUnionRel.java | 4 +-
.../extensions/sql/impl/rel/BeamValuesRel.java | 15 +-
.../transform/BeamAggregationTransforms.java | 47 ++--
.../sql/impl/transform/BeamJoinTransforms.java | 65 +++---
.../transform/BeamSetOperatorsTransforms.java | 24 +-
.../sql/impl/transform/BeamSqlFilterFn.java | 6 +-
.../transform/BeamSqlOutputToConsoleFn.java | 4 +-
.../sql/impl/transform/BeamSqlProjectFn.java | 14 +-
.../extensions/sql/impl/utils/CalciteUtils.java | 10 +-
.../extensions/sql/schema/BaseBeamTable.java | 6 +-
.../sql/schema/BeamPCollectionTable.java | 13 +-
.../sql/schema/BeamSqlRecordHelper.java | 217 +++++++++++++++++++
.../sql/schema/BeamSqlRecordType.java | 168 ++++++++++++++
.../sdk/extensions/sql/schema/BeamSqlRow.java | 41 ----
.../extensions/sql/schema/BeamSqlRowCoder.java | 186 ----------------
.../extensions/sql/schema/BeamSqlRowType.java | 109 ----------
.../sdk/extensions/sql/schema/BeamSqlTable.java | 7 +-
.../extensions/sql/schema/BeamTableUtils.java | 14 +-
.../sql/schema/kafka/BeamKafkaCSVTable.java | 38 ++--
.../sql/schema/kafka/BeamKafkaTable.java | 20 +-
.../sql/schema/text/BeamTextCSVTable.java | 12 +-
.../schema/text/BeamTextCSVTableIOReader.java | 14 +-
.../schema/text/BeamTextCSVTableIOWriter.java | 16 +-
.../sql/schema/text/BeamTextTable.java | 4 +-
.../sql/BeamSqlDslAggregationTest.java | 80 +++----
.../beam/sdk/extensions/sql/BeamSqlDslBase.java | 51 +++--
.../extensions/sql/BeamSqlDslFilterTest.java | 26 +--
.../sdk/extensions/sql/BeamSqlDslJoinTest.java | 26 +--
.../extensions/sql/BeamSqlDslProjectTest.java | 64 +++---
.../extensions/sql/BeamSqlDslUdfUdafTest.java | 24 +-
.../beam/sdk/extensions/sql/TestUtils.java | 30 +--
.../interpreter/BeamSqlFnExecutorTestBase.java | 10 +-
.../sql/impl/rel/BeamIntersectRelTest.java | 6 +-
.../rel/BeamJoinRelBoundedVsBoundedTest.java | 10 +-
.../rel/BeamJoinRelUnboundedVsBoundedTest.java | 10 +-
.../BeamJoinRelUnboundedVsUnboundedTest.java | 10 +-
.../sql/impl/rel/BeamMinusRelTest.java | 6 +-
.../impl/rel/BeamSetOperatorRelBaseTest.java | 4 +-
.../sql/impl/rel/BeamSortRelTest.java | 12 +-
.../sql/impl/rel/BeamUnionRelTest.java | 6 +-
.../sql/impl/rel/BeamValuesRelTest.java | 8 +-
.../sdk/extensions/sql/impl/rel/CheckSize.java | 8 +-
...mSqlBuiltinFunctionsIntegrationTestBase.java | 17 +-
...amSqlComparisonOperatorsIntegrationTest.java | 11 +-
.../BeamSqlDateFunctionsIntegrationTest.java | 12 +-
.../extensions/sql/mock/MockedBoundedTable.java | 24 +-
.../sdk/extensions/sql/mock/MockedTable.java | 8 +-
.../sql/mock/MockedUnboundedTable.java | 18 +-
.../sql/schema/BeamSqlRowCoderTest.java | 8 +-
.../sql/schema/kafka/BeamKafkaCSVTableTest.java | 14 +-
.../sql/schema/text/BeamTextCSVTableTest.java | 16 +-
.../transform/BeamAggregationTransformTest.java | 72 +++---
.../schema/transform/BeamTransformBaseTest.java | 18 +-
112 files changed, 1171 insertions(+), 1129 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BeamRecordCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BeamRecordCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BeamRecordCoder.java
index 27f92ce..06958a4 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BeamRecordCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BeamRecordCoder.java
@@ -24,7 +24,7 @@ import java.util.BitSet;
import java.util.List;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.values.BeamRecord;
-import org.apache.beam.sdk.values.BeamRecordTypeProvider;
+import org.apache.beam.sdk.values.BeamRecordType;
/**
* A {@link Coder} for {@link BeamRecord}. It wraps the {@link Coder} for each element directly.
@@ -34,31 +34,35 @@ public class BeamRecordCoder extends CustomCoder<BeamRecord> {
private static final BitSetCoder nullListCoder = BitSetCoder.of();
private static final InstantCoder instantCoder = InstantCoder.of();
- private BeamRecordTypeProvider recordType;
+ private BeamRecordType recordType;
private List<Coder> coderArray;
- private BeamRecordCoder(BeamRecordTypeProvider recordType, List<Coder> coderArray) {
+ private BeamRecordCoder(BeamRecordType recordType, List<Coder> coderArray) {
this.recordType = recordType;
this.coderArray = coderArray;
}
- public static BeamRecordCoder of(BeamRecordTypeProvider recordType, List<Coder> coderArray){
+ public static BeamRecordCoder of(BeamRecordType recordType, List<Coder> coderArray){
if (recordType.size() != coderArray.size()) {
throw new IllegalArgumentException("Coder size doesn't match with field size");
}
return new BeamRecordCoder(recordType, coderArray);
}
+ public BeamRecordType getRecordType() {
+ return recordType;
+ }
+
@Override
public void encode(BeamRecord value, OutputStream outStream)
throws CoderException, IOException {
nullListCoder.encode(value.getNullFields(), outStream);
for (int idx = 0; idx < value.size(); ++idx) {
- if (value.getNullFields().get(idx)) {
+ if (value.isNull(idx)) {
continue;
}
- coderArray.get(idx).encode(value.getInteger(idx), outStream);
+ coderArray.get(idx).encode(value.getFieldValue(idx), outStream);
}
instantCoder.encode(value.getWindowStart(), outStream);
@@ -70,7 +74,6 @@ public class BeamRecordCoder extends CustomCoder<BeamRecord> {
BitSet nullFields = nullListCoder.decode(inStream);
BeamRecord record = new BeamRecord(recordType);
- record.setNullFields(nullFields);
for (int idx = 0; idx < recordType.size(); ++idx) {
if (nullFields.get(idx)) {
continue;
@@ -88,5 +91,8 @@ public class BeamRecordCoder extends CustomCoder<BeamRecord> {
@Override
public void verifyDeterministic()
throws org.apache.beam.sdk.coders.Coder.NonDeterministicException {
+ for (Coder c : coderArray) {
+ c.verifyDeterministic();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecord.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecord.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecord.java
index 476233e..bac649e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecord.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecord.java
@@ -32,7 +32,7 @@ import org.joda.time.Instant;
/**
* {@link org.apache.beam.sdk.values.BeamRecord}, self-described with
- * {@link BeamRecordTypeProvider}, represents one element in a
+ * {@link BeamRecordType}, represents one element in a
* {@link org.apache.beam.sdk.values.PCollection}.
*/
@Experimental
@@ -40,12 +40,12 @@ public class BeamRecord implements Serializable {
private List<Object> dataValues;
//null values are indexed here, to handle properly in Coder.
private BitSet nullFields;
- private BeamRecordTypeProvider dataType;
+ private BeamRecordType dataType;
private Instant windowStart = new Instant(TimeUnit.MICROSECONDS.toMillis(Long.MIN_VALUE));
private Instant windowEnd = new Instant(TimeUnit.MICROSECONDS.toMillis(Long.MAX_VALUE));
- public BeamRecord(BeamRecordTypeProvider dataType) {
+ public BeamRecord(BeamRecordType dataType) {
this.dataType = dataType;
this.nullFields = new BitSet(dataType.size());
this.dataValues = new ArrayList<>();
@@ -55,7 +55,7 @@ public class BeamRecord implements Serializable {
}
}
- public BeamRecord(BeamRecordTypeProvider dataType, List<Object> dataValues) {
+ public BeamRecord(BeamRecordType dataType, List<Object> dataValues) {
this(dataType);
for (int idx = 0; idx < dataValues.size(); ++idx) {
addField(idx, dataValues.get(idx));
@@ -137,10 +137,6 @@ public class BeamRecord implements Serializable {
}
public Object getFieldValue(int fieldIdx) {
- if (nullFields.get(fieldIdx)) {
- return null;
- }
-
return dataValues.get(fieldIdx);
}
@@ -200,22 +196,14 @@ public class BeamRecord implements Serializable {
this.dataValues = dataValues;
}
- public BeamRecordTypeProvider getDataType() {
+ public BeamRecordType getDataType() {
return dataType;
}
- public void setDataType(BeamRecordTypeProvider dataType) {
- this.dataType = dataType;
- }
-
public BitSet getNullFields() {
return nullFields;
}
- public void setNullFields(BitSet nullFields) {
- this.nullFields = nullFields;
- }
-
/**
* is the specified field NULL?
*/
http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecordType.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecordType.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecordType.java
new file mode 100644
index 0000000..3b20b50
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecordType.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.values;
+
+import java.io.Serializable;
+import java.util.List;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.BeamRecordCoder;
+import org.apache.beam.sdk.coders.Coder;
+
+/**
+ * The default type provider used in {@link BeamRecord}.
+ */
+@Experimental
+public class BeamRecordType implements Serializable{
+ private List<String> fieldsName;
+ private List<Coder> fieldsCoder;
+
+ public BeamRecordType(List<String> fieldsName, List<Coder> fieldsCoder) {
+ this.fieldsName = fieldsName;
+ this.fieldsCoder = fieldsCoder;
+ }
+
+ /**
+ * Validate input fieldValue for a field.
+ * @throws IllegalArgumentException throw exception when the validation fails.
+ */
+ public void validateValueType(int index, Object fieldValue)
+ throws IllegalArgumentException{
+ //do nothing by default.
+ }
+
+ /**
+ * Get the coder for {@link BeamRecordCoder}.
+ */
+ public BeamRecordCoder getRecordCoder(){
+ return BeamRecordCoder.of(this, fieldsCoder);
+ }
+
+ public List<String> getFieldsName(){
+ return fieldsName;
+ }
+
+ public String getFieldByIndex(int index){
+ return fieldsName.get(index);
+ }
+
+ public int findIndexOfField(String fieldName){
+ return fieldsName.indexOf(fieldName);
+ }
+
+ public int size(){
+ return fieldsName.size();
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecordTypeProvider.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecordTypeProvider.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecordTypeProvider.java
deleted file mode 100644
index 63a961c..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecordTypeProvider.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.values;
-
-import java.io.Serializable;
-import java.util.List;
-import org.apache.beam.sdk.annotations.Experimental;
-
-/**
- * The default type provider used in {@link BeamRecord}.
- */
-@Experimental
-public class BeamRecordTypeProvider implements Serializable{
- private List<String> fieldsName;
-
- public BeamRecordTypeProvider(List<String> fieldsName) {
- this.fieldsName = fieldsName;
- }
-
- /**
- * Validate input fieldValue for a field.
- * @throws IllegalArgumentException throw exception when the validation fails.
- */
- public void validateValueType(int index, Object fieldValue)
- throws IllegalArgumentException{
- //do nothing by default.
- }
-
- public List<String> getFieldsName(){
- return fieldsName;
- }
-
- public String getFieldByIndex(int index){
- return fieldsName.get(index);
- }
-
- public int findIndexOfField(String fieldName){
- return fieldsName.indexOf(fieldName);
- }
-
- public int size(){
- return fieldsName.size();
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSql.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSql.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSql.java
index 0dabf40..86e4d8d 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSql.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSql.java
@@ -19,13 +19,14 @@ package org.apache.beam.sdk.extensions.sql;
import com.google.auto.value.AutoValue;
import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.BeamRecordCoder;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
import org.apache.beam.sdk.extensions.sql.schema.BeamPCollectionTable;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowCoder;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType;
import org.apache.beam.sdk.extensions.sql.schema.BeamSqlUdaf;
import org.apache.beam.sdk.extensions.sql.schema.BeamSqlUdf;
import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.BeamRecord;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
@@ -111,7 +112,7 @@ public class BeamSql {
*/
@AutoValue
public abstract static class QueryTransform extends
- PTransform<PCollectionTuple, PCollection<BeamSqlRow>> {
+ PTransform<PCollectionTuple, PCollection<BeamRecord>> {
abstract BeamSqlEnv getSqlEnv();
abstract String getSqlQuery();
@@ -143,7 +144,7 @@ public class BeamSql {
}
@Override
- public PCollection<BeamSqlRow> expand(PCollectionTuple input) {
+ public PCollection<BeamRecord> expand(PCollectionTuple input) {
registerTables(input);
BeamRelNode beamRelNode = null;
@@ -163,11 +164,12 @@ public class BeamSql {
//register tables, related with input PCollections.
private void registerTables(PCollectionTuple input){
for (TupleTag<?> sourceTag : input.getAll().keySet()) {
- PCollection<BeamSqlRow> sourceStream = (PCollection<BeamSqlRow>) input.get(sourceTag);
- BeamSqlRowCoder sourceCoder = (BeamSqlRowCoder) sourceStream.getCoder();
+ PCollection<BeamRecord> sourceStream = (PCollection<BeamRecord>) input.get(sourceTag);
+ BeamRecordCoder sourceCoder = (BeamRecordCoder) sourceStream.getCoder();
getSqlEnv().registerTable(sourceTag.getId(),
- new BeamPCollectionTable(sourceStream, sourceCoder.getSqlRecordType()));
+ new BeamPCollectionTable(sourceStream,
+ (BeamSqlRecordType) sourceCoder.getRecordType()));
}
}
}
@@ -178,7 +180,7 @@ public class BeamSql {
*/
@AutoValue
public abstract static class SimpleQueryTransform
- extends PTransform<PCollection<BeamSqlRow>, PCollection<BeamSqlRow>> {
+ extends PTransform<PCollection<BeamRecord>, PCollection<BeamRecord>> {
private static final String PCOLLECTION_TABLE_NAME = "PCOLLECTION";
abstract BeamSqlEnv getSqlEnv();
abstract String getSqlQuery();
@@ -232,9 +234,9 @@ public class BeamSql {
}
@Override
- public PCollection<BeamSqlRow> expand(PCollection<BeamSqlRow> input) {
+ public PCollection<BeamRecord> expand(PCollection<BeamRecord> input) {
validateQuery();
- return PCollectionTuple.of(new TupleTag<BeamSqlRow>(PCOLLECTION_TABLE_NAME), input)
+ return PCollectionTuple.of(new TupleTag<BeamRecord>(PCOLLECTION_TABLE_NAME), input)
.apply(QueryTransform.builder()
.setSqlEnv(getSqlEnv())
.setSqlQuery(getSqlQuery())
http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java
index 967dee5..a43808e 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java
@@ -20,9 +20,9 @@ package org.apache.beam.sdk.extensions.sql;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.values.BeamRecord;
import org.apache.beam.sdk.values.PCollection;
import org.apache.calcite.plan.RelOptUtil;
@@ -43,7 +43,7 @@ public class BeamSqlCli {
/**
* compile SQL, and return a {@link Pipeline}.
*/
- public static PCollection<BeamSqlRow> compilePipeline(String sqlStatement, BeamSqlEnv sqlEnv)
+ public static PCollection<BeamRecord> compilePipeline(String sqlStatement, BeamSqlEnv sqlEnv)
throws Exception{
PipelineOptions options = PipelineOptionsFactory.fromArgs(new String[] {}).withValidation()
.as(PipelineOptions.class); // FlinkPipelineOptions.class
@@ -56,9 +56,9 @@ public class BeamSqlCli {
/**
* compile SQL, and return a {@link Pipeline}.
*/
- public static PCollection<BeamSqlRow> compilePipeline(String sqlStatement, Pipeline basePipeline,
+ public static PCollection<BeamRecord> compilePipeline(String sqlStatement, Pipeline basePipeline,
BeamSqlEnv sqlEnv) throws Exception{
- PCollection<BeamSqlRow> resultStream =
+ PCollection<BeamRecord> resultStream =
sqlEnv.planner.compileBeamPipeline(sqlStatement, basePipeline, sqlEnv);
return resultStream;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlEnv.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlEnv.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlEnv.java
index be0b0af..3c5eb36 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlEnv.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlEnv.java
@@ -21,7 +21,7 @@ import java.io.Serializable;
import org.apache.beam.sdk.extensions.sql.impl.planner.BeamQueryPlanner;
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
import org.apache.beam.sdk.extensions.sql.schema.BaseBeamTable;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType;
import org.apache.beam.sdk.extensions.sql.schema.BeamSqlUdaf;
import org.apache.beam.sdk.extensions.sql.schema.BeamSqlUdf;
import org.apache.calcite.DataContext;
@@ -84,8 +84,8 @@ public class BeamSqlEnv implements Serializable{
}
private static class BeamCalciteTable implements ScannableTable, Serializable {
- private BeamSqlRowType beamSqlRowType;
- public BeamCalciteTable(BeamSqlRowType beamSqlRowType) {
+ private BeamSqlRecordType beamSqlRowType;
+ public BeamCalciteTable(BeamSqlRecordType beamSqlRowType) {
this.beamSqlRowType = beamSqlRowType;
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java
index 21e02a7..fbc1fd8 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java
@@ -22,14 +22,13 @@ import java.util.Arrays;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.extensions.sql.BeamSql;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowCoder;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.values.BeamRecord;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
@@ -54,39 +53,39 @@ class BeamSqlExample {
//define the input row format
List<String> fieldNames = Arrays.asList("c1", "c2", "c3");
List<Integer> fieldTypes = Arrays.asList(Types.INTEGER, Types.VARCHAR, Types.DOUBLE);
- BeamSqlRowType type = BeamSqlRowType.create(fieldNames, fieldTypes);
- BeamSqlRow row = new BeamSqlRow(type);
+ BeamSqlRecordType type = BeamSqlRecordType.create(fieldNames, fieldTypes);
+ BeamRecord row = new BeamRecord(type);
row.addField(0, 1);
row.addField(1, "row");
row.addField(2, 1.0);
//create a source PCollection with Create.of();
- PCollection<BeamSqlRow> inputTable = PBegin.in(p).apply(Create.of(row)
- .withCoder(new BeamSqlRowCoder(type)));
+ PCollection<BeamRecord> inputTable = PBegin.in(p).apply(Create.of(row)
+ .withCoder(type.getRecordCoder()));
//Case 1. run a simple SQL query over input PCollection with BeamSql.simpleQuery;
- PCollection<BeamSqlRow> outputStream = inputTable.apply(
+ PCollection<BeamRecord> outputStream = inputTable.apply(
BeamSql.simpleQuery("select c1, c2, c3 from PCOLLECTION where c1=1"));
//print the output record of case 1;
outputStream.apply("log_result",
- MapElements.<BeamSqlRow, Void>via(new SimpleFunction<BeamSqlRow, Void>() {
- public Void apply(BeamSqlRow input) {
+ MapElements.<BeamRecord, Void>via(new SimpleFunction<BeamRecord, Void>() {
+ public Void apply(BeamRecord input) {
System.out.println("PCOLLECTION: " + input);
return null;
}
}));
//Case 2. run the query with BeamSql.query over result PCollection of case 1.
- PCollection<BeamSqlRow> outputStream2 =
- PCollectionTuple.of(new TupleTag<BeamSqlRow>("CASE1_RESULT"), outputStream)
+ PCollection<BeamRecord> outputStream2 =
+ PCollectionTuple.of(new TupleTag<BeamRecord>("CASE1_RESULT"), outputStream)
.apply(BeamSql.query("select c2, c3 from CASE1_RESULT where c1=1"));
//print the output record of case 2;
outputStream2.apply("log_result",
- MapElements.<BeamSqlRow, Void>via(new SimpleFunction<BeamSqlRow, Void>() {
+ MapElements.<BeamRecord, Void>via(new SimpleFunction<BeamRecord, Void>() {
@Override
- public Void apply(BeamSqlRow input) {
+ public Void apply(BeamRecord input) {
System.out.println("TABLE_B: " + input);
return null;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlExpressionExecutor.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlExpressionExecutor.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlExpressionExecutor.java
index 1ae6bb3..3cd6d65 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlExpressionExecutor.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlExpressionExecutor.java
@@ -19,7 +19,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter;
import java.io.Serializable;
import java.util.List;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.values.BeamRecord;
/**
* {@code BeamSqlExpressionExecutor} fills the gap between relational
@@ -34,10 +34,10 @@ public interface BeamSqlExpressionExecutor extends Serializable {
void prepare();
/**
- * apply transformation to input record {@link BeamSqlRow}.
+ * apply transformation to input record {@link BeamRecord}.
*
*/
- List<Object> execute(BeamSqlRow inputRow);
+ List<Object> execute(BeamRecord inputRow);
void close();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java
index 1f9e0e3..0f77ed8 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java
@@ -88,7 +88,7 @@ import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string.BeamS
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamFilterRel;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamProjectRel;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.values.BeamRecord;
import org.apache.calcite.rex.RexCall;
import org.apache.calcite.rex.RexInputRef;
import org.apache.calcite.rex.RexLiteral;
@@ -102,7 +102,7 @@ import org.apache.calcite.util.NlsString;
/**
* Executor based on {@link BeamSqlExpression} and {@link BeamSqlPrimitive}.
* {@code BeamSqlFnExecutor} converts a {@link BeamRelNode} to a {@link BeamSqlExpression},
- * which can be evaluated against the {@link BeamSqlRow}.
+ * which can be evaluated against the {@link BeamRecord}.
*
*/
public class BeamSqlFnExecutor implements BeamSqlExpressionExecutor {
@@ -427,7 +427,7 @@ public class BeamSqlFnExecutor implements BeamSqlExpressionExecutor {
}
@Override
- public List<Object> execute(BeamSqlRow inputRow) {
+ public List<Object> execute(BeamRecord inputRow) {
List<Object> results = new ArrayList<>();
for (BeamSqlExpression exp : exps) {
results.add(exp.evaluate(inputRow).getValue());
http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlCaseExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlCaseExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlCaseExpression.java
index 61e8aae..af48cbe 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlCaseExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlCaseExpression.java
@@ -19,7 +19,7 @@
package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator;
import java.util.List;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.values.BeamRecord;
import org.apache.calcite.sql.type.SqlTypeName;
/**
@@ -48,7 +48,7 @@ public class BeamSqlCaseExpression extends BeamSqlExpression {
return true;
}
- @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+ @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow) {
for (int i = 0; i < operands.size() - 1; i += 2) {
if (opValueEvaluated(i, inputRow)) {
return BeamSqlPrimitive.of(
http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlCastExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlCastExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlCastExpression.java
index c98c10d..3786281 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlCastExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlCastExpression.java
@@ -21,7 +21,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator;
import java.sql.Date;
import java.sql.Timestamp;
import java.util.List;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.values.BeamRecord;
import org.apache.calcite.runtime.SqlFunctions;
import org.apache.calcite.sql.type.SqlTypeName;
import org.joda.time.format.DateTimeFormat;
@@ -71,7 +71,7 @@ public class BeamSqlCastExpression extends BeamSqlExpression {
}
@Override
- public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+ public BeamSqlPrimitive evaluate(BeamRecord inputRow) {
SqlTypeName castOutputType = getOutputType();
switch (castOutputType) {
case INTEGER:
http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlExpression.java
index dc5db81..f42a365 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlExpression.java
@@ -19,7 +19,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator;
import java.io.Serializable;
import java.util.List;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.values.BeamRecord;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.type.SqlTypeName;
@@ -49,7 +49,7 @@ public abstract class BeamSqlExpression implements Serializable {
return op(idx).getOutputType();
}
- public <T> T opValueEvaluated(int idx, BeamSqlRow row) {
+ public <T> T opValueEvaluated(int idx, BeamRecord row) {
return (T) op(idx).evaluate(row).getValue();
}
@@ -59,10 +59,10 @@ public abstract class BeamSqlExpression implements Serializable {
public abstract boolean accept();
/**
- * Apply input record {@link BeamSqlRow} to this expression,
+ * Apply input record {@link BeamRecord} to this expression,
* the output value is wrapped with {@link BeamSqlPrimitive}.
*/
- public abstract BeamSqlPrimitive evaluate(BeamSqlRow inputRow);
+ public abstract BeamSqlPrimitive evaluate(BeamRecord inputRow);
public List<BeamSqlExpression> getOperands() {
return operands;
http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlInputRefExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlInputRefExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlInputRefExpression.java
index 7aba024..8c3d4d4 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlInputRefExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlInputRefExpression.java
@@ -17,7 +17,7 @@
*/
package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.values.BeamRecord;
import org.apache.calcite.sql.type.SqlTypeName;
/**
@@ -37,7 +37,7 @@ public class BeamSqlInputRefExpression extends BeamSqlExpression {
}
@Override
- public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+ public BeamSqlPrimitive evaluate(BeamRecord inputRow) {
return BeamSqlPrimitive.of(outputType, inputRow.getFieldValue(inputRef));
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlPrimitive.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlPrimitive.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlPrimitive.java
index 6380af9..f763898 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlPrimitive.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlPrimitive.java
@@ -21,13 +21,13 @@ import java.math.BigDecimal;
import java.util.Date;
import java.util.GregorianCalendar;
import java.util.List;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.values.BeamRecord;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.util.NlsString;
/**
* {@link BeamSqlPrimitive} is a special, self-reference {@link BeamSqlExpression}.
- * It holds the value, and return it directly during {@link #evaluate(BeamSqlRow)}.
+ * It holds the value, and return it directly during {@link #evaluate(BeamRecord)}.
*
*/
public class BeamSqlPrimitive<T> extends BeamSqlExpression {
@@ -145,7 +145,7 @@ public class BeamSqlPrimitive<T> extends BeamSqlExpression {
}
@Override
- public BeamSqlPrimitive<T> evaluate(BeamSqlRow inputRow) {
+ public BeamSqlPrimitive<T> evaluate(BeamRecord inputRow) {
return this;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlReinterpretExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlReinterpretExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlReinterpretExpression.java
index 243baaa..c1fa2c7 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlReinterpretExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlReinterpretExpression.java
@@ -21,7 +21,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator;
import java.util.Date;
import java.util.GregorianCalendar;
import java.util.List;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.values.BeamRecord;
import org.apache.calcite.sql.type.SqlTypeName;
/**
@@ -41,7 +41,7 @@ public class BeamSqlReinterpretExpression extends BeamSqlExpression {
&& SqlTypeName.DATETIME_TYPES.contains(opType(0));
}
- @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+ @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow) {
if (opType(0) == SqlTypeName.TIME) {
GregorianCalendar date = opValueEvaluated(0, inputRow);
return BeamSqlPrimitive.of(outputType, date.getTimeInMillis());
http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlUdfExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlUdfExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlUdfExpression.java
index eebb97c..da706f3 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlUdfExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlUdfExpression.java
@@ -20,7 +20,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.values.BeamRecord;
import org.apache.calcite.sql.type.SqlTypeName;
/**
@@ -51,7 +51,7 @@ public class BeamSqlUdfExpression extends BeamSqlExpression {
}
@Override
- public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+ public BeamSqlPrimitive evaluate(BeamRecord inputRow) {
if (method == null) {
reConstructMethod();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlWindowEndExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlWindowEndExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlWindowEndExpression.java
index 0bd68df..2f4c165 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlWindowEndExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlWindowEndExpression.java
@@ -18,7 +18,7 @@
package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator;
import java.util.Date;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.values.BeamRecord;
import org.apache.calcite.sql.type.SqlTypeName;
/**
@@ -34,7 +34,7 @@ public class BeamSqlWindowEndExpression extends BeamSqlExpression {
}
@Override
- public BeamSqlPrimitive<Date> evaluate(BeamSqlRow inputRow) {
+ public BeamSqlPrimitive<Date> evaluate(BeamRecord inputRow) {
return BeamSqlPrimitive.of(SqlTypeName.TIMESTAMP,
new Date(inputRow.getWindowEnd().getMillis()));
}
http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlWindowExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlWindowExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlWindowExpression.java
index b560ef8..2f3dd5c 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlWindowExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlWindowExpression.java
@@ -19,7 +19,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator;
import java.util.Date;
import java.util.List;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.values.BeamRecord;
import org.apache.calcite.sql.type.SqlTypeName;
/**
@@ -42,7 +42,7 @@ public class BeamSqlWindowExpression extends BeamSqlExpression {
}
@Override
- public BeamSqlPrimitive<Date> evaluate(BeamSqlRow inputRow) {
+ public BeamSqlPrimitive<Date> evaluate(BeamRecord inputRow) {
return BeamSqlPrimitive.of(SqlTypeName.TIMESTAMP,
(Date) operands.get(0).evaluate(inputRow).getValue());
}
http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlWindowStartExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlWindowStartExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlWindowStartExpression.java
index e2c1b34..9186ec0 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlWindowStartExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlWindowStartExpression.java
@@ -18,7 +18,7 @@
package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator;
import java.util.Date;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.values.BeamRecord;
import org.apache.calcite.sql.type.SqlTypeName;
/**
@@ -35,7 +35,7 @@ public class BeamSqlWindowStartExpression extends BeamSqlExpression {
}
@Override
- public BeamSqlPrimitive<Date> evaluate(BeamSqlRow inputRow) {
+ public BeamSqlPrimitive<Date> evaluate(BeamRecord inputRow) {
return BeamSqlPrimitive.of(SqlTypeName.TIMESTAMP,
new Date(inputRow.getWindowStart().getMillis()));
}
http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/arithmetic/BeamSqlArithmeticExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/arithmetic/BeamSqlArithmeticExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/arithmetic/BeamSqlArithmeticExpression.java
index b07b28f..fd36457 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/arithmetic/BeamSqlArithmeticExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/arithmetic/BeamSqlArithmeticExpression.java
@@ -23,7 +23,7 @@ import java.util.ArrayList;
import java.util.List;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.values.BeamRecord;
import org.apache.calcite.sql.type.SqlTypeName;
/**
@@ -50,7 +50,7 @@ public abstract class BeamSqlArithmeticExpression extends BeamSqlExpression {
super(operands, outputType);
}
- @Override public BeamSqlPrimitive<? extends Number> evaluate(BeamSqlRow inputRow) {
+ @Override public BeamSqlPrimitive<? extends Number> evaluate(BeamRecord inputRow) {
BigDecimal left = BigDecimal.valueOf(
Double.valueOf(opValueEvaluated(0, inputRow).toString()));
BigDecimal right = BigDecimal.valueOf(
http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/comparison/BeamSqlCompareExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/comparison/BeamSqlCompareExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/comparison/BeamSqlCompareExpression.java
index 811b582..93032ae 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/comparison/BeamSqlCompareExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/comparison/BeamSqlCompareExpression.java
@@ -20,7 +20,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.comparison;
import java.util.List;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.values.BeamRecord;
import org.apache.calcite.sql.type.SqlTypeName;
/**
@@ -51,7 +51,7 @@ public abstract class BeamSqlCompareExpression extends BeamSqlExpression {
}
@Override
- public BeamSqlPrimitive<Boolean> evaluate(BeamSqlRow inputRow) {
+ public BeamSqlPrimitive<Boolean> evaluate(BeamRecord inputRow) {
Object leftValue = operands.get(0).evaluate(inputRow).getValue();
Object rightValue = operands.get(1).evaluate(inputRow).getValue();
switch (operands.get(0).getOutputType()) {
http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/comparison/BeamSqlIsNotNullExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/comparison/BeamSqlIsNotNullExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/comparison/BeamSqlIsNotNullExpression.java
index 88dc73f..7177d96 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/comparison/BeamSqlIsNotNullExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/comparison/BeamSqlIsNotNullExpression.java
@@ -21,7 +21,7 @@ import java.util.Arrays;
import java.util.List;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.values.BeamRecord;
import org.apache.calcite.sql.type.SqlTypeName;
/**
@@ -46,7 +46,7 @@ public class BeamSqlIsNotNullExpression extends BeamSqlExpression {
}
@Override
- public BeamSqlPrimitive<Boolean> evaluate(BeamSqlRow inputRow) {
+ public BeamSqlPrimitive<Boolean> evaluate(BeamRecord inputRow) {
Object leftValue = operands.get(0).evaluate(inputRow).getValue();
return BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, leftValue != null);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/comparison/BeamSqlIsNullExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/comparison/BeamSqlIsNullExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/comparison/BeamSqlIsNullExpression.java
index b626ce7..c74fcd9 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/comparison/BeamSqlIsNullExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/comparison/BeamSqlIsNullExpression.java
@@ -21,7 +21,7 @@ import java.util.Arrays;
import java.util.List;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.values.BeamRecord;
import org.apache.calcite.sql.type.SqlTypeName;
/**
@@ -46,7 +46,7 @@ public class BeamSqlIsNullExpression extends BeamSqlExpression {
}
@Override
- public BeamSqlPrimitive<Boolean> evaluate(BeamSqlRow inputRow) {
+ public BeamSqlPrimitive<Boolean> evaluate(BeamRecord inputRow) {
Object leftValue = operands.get(0).evaluate(inputRow).getValue();
return BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, leftValue == null);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlCurrentDateExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlCurrentDateExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlCurrentDateExpression.java
index d5793d5..86abe43 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlCurrentDateExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlCurrentDateExpression.java
@@ -22,7 +22,7 @@ import java.util.Collections;
import java.util.Date;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.values.BeamRecord;
import org.apache.calcite.sql.type.SqlTypeName;
/**
@@ -38,7 +38,7 @@ public class BeamSqlCurrentDateExpression extends BeamSqlExpression {
return getOperands().size() == 0;
}
- @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+ @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow) {
return BeamSqlPrimitive.of(outputType, new Date());
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlCurrentTimeExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlCurrentTimeExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlCurrentTimeExpression.java
index 99eea95..d8de464 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlCurrentTimeExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlCurrentTimeExpression.java
@@ -24,7 +24,7 @@ import java.util.List;
import java.util.TimeZone;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.values.BeamRecord;
import org.apache.calcite.sql.type.SqlTypeName;
/**
@@ -44,7 +44,7 @@ public class BeamSqlCurrentTimeExpression extends BeamSqlExpression {
return opCount <= 1;
}
- @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+ @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow) {
GregorianCalendar ret = new GregorianCalendar(TimeZone.getDefault());
ret.setTime(new Date());
return BeamSqlPrimitive.of(outputType, ret);
http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlCurrentTimestampExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlCurrentTimestampExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlCurrentTimestampExpression.java
index 09a3c60..4736571 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlCurrentTimestampExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlCurrentTimestampExpression.java
@@ -22,7 +22,7 @@ import java.util.Date;
import java.util.List;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.values.BeamRecord;
import org.apache.calcite.sql.type.SqlTypeName;
/**
@@ -42,7 +42,7 @@ public class BeamSqlCurrentTimestampExpression extends BeamSqlExpression {
return opCount <= 1;
}
- @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+ @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow) {
return BeamSqlPrimitive.of(outputType, new Date());
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDateCeilExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDateCeilExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDateCeilExpression.java
index 55b6fcd..55767fa 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDateCeilExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDateCeilExpression.java
@@ -22,7 +22,7 @@ import java.util.Date;
import java.util.List;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.values.BeamRecord;
import org.apache.calcite.avatica.util.DateTimeUtils;
import org.apache.calcite.avatica.util.TimeUnitRange;
import org.apache.calcite.sql.type.SqlTypeName;
@@ -41,7 +41,7 @@ public class BeamSqlDateCeilExpression extends BeamSqlExpression {
&& opType(1) == SqlTypeName.SYMBOL;
}
- @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+ @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow) {
Date date = opValueEvaluated(0, inputRow);
long time = date.getTime();
TimeUnitRange unit = ((BeamSqlPrimitive<TimeUnitRange>) op(1)).getValue();
http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDateFloorExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDateFloorExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDateFloorExpression.java
index f031c31..3310da5 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDateFloorExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDateFloorExpression.java
@@ -22,7 +22,7 @@ import java.util.Date;
import java.util.List;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.values.BeamRecord;
import org.apache.calcite.avatica.util.DateTimeUtils;
import org.apache.calcite.avatica.util.TimeUnitRange;
import org.apache.calcite.sql.type.SqlTypeName;
@@ -41,7 +41,7 @@ public class BeamSqlDateFloorExpression extends BeamSqlExpression {
&& opType(1) == SqlTypeName.SYMBOL;
}
- @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+ @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow) {
Date date = opValueEvaluated(0, inputRow);
long time = date.getTime();
TimeUnitRange unit = ((BeamSqlPrimitive<TimeUnitRange>) op(1)).getValue();
http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlExtractExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlExtractExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlExtractExpression.java
index 2740f82..47cd879 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlExtractExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlExtractExpression.java
@@ -25,7 +25,7 @@ import java.util.List;
import java.util.Map;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.values.BeamRecord;
import org.apache.calcite.avatica.util.DateTimeUtils;
import org.apache.calcite.avatica.util.TimeUnitRange;
import org.apache.calcite.sql.type.SqlTypeName;
@@ -61,7 +61,7 @@ public class BeamSqlExtractExpression extends BeamSqlExpression {
&& opType(1) == SqlTypeName.BIGINT;
}
- @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+ @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow) {
Long time = opValueEvaluated(1, inputRow);
TimeUnitRange unit = ((BeamSqlPrimitive<TimeUnitRange>) op(0)).getValue();
http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/logical/BeamSqlAndExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/logical/BeamSqlAndExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/logical/BeamSqlAndExpression.java
index 0c8854c..b8964d5 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/logical/BeamSqlAndExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/logical/BeamSqlAndExpression.java
@@ -20,7 +20,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.logical;
import java.util.List;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.values.BeamRecord;
import org.apache.calcite.sql.type.SqlTypeName;
/**
@@ -32,7 +32,7 @@ public class BeamSqlAndExpression extends BeamSqlLogicalExpression {
}
@Override
- public BeamSqlPrimitive<Boolean> evaluate(BeamSqlRow inputRow) {
+ public BeamSqlPrimitive<Boolean> evaluate(BeamRecord inputRow) {
boolean result = true;
for (BeamSqlExpression exp : operands) {
BeamSqlPrimitive<Boolean> expOut = exp.evaluate(inputRow);
http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/logical/BeamSqlNotExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/logical/BeamSqlNotExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/logical/BeamSqlNotExpression.java
index 65634b0..f9578b9 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/logical/BeamSqlNotExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/logical/BeamSqlNotExpression.java
@@ -21,7 +21,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.logical;
import java.util.List;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.values.BeamRecord;
import org.apache.calcite.sql.type.SqlTypeName;
/**
@@ -42,7 +42,7 @@ public class BeamSqlNotExpression extends BeamSqlLogicalExpression {
return super.accept();
}
- @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+ @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow) {
Boolean value = opValueEvaluated(0, inputRow);
if (value == null) {
return BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, null);
http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/logical/BeamSqlOrExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/logical/BeamSqlOrExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/logical/BeamSqlOrExpression.java
index da15c34..88a3916 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/logical/BeamSqlOrExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/logical/BeamSqlOrExpression.java
@@ -20,7 +20,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.logical;
import java.util.List;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.values.BeamRecord;
import org.apache.calcite.sql.type.SqlTypeName;
/**
@@ -32,7 +32,7 @@ public class BeamSqlOrExpression extends BeamSqlLogicalExpression {
}
@Override
- public BeamSqlPrimitive<Boolean> evaluate(BeamSqlRow inputRow) {
+ public BeamSqlPrimitive<Boolean> evaluate(BeamRecord inputRow) {
boolean result = false;
for (BeamSqlExpression exp : operands) {
BeamSqlPrimitive<Boolean> expOut = exp.evaluate(inputRow);
http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlMathBinaryExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlMathBinaryExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlMathBinaryExpression.java
index c12b725..8f6c00c 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlMathBinaryExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlMathBinaryExpression.java
@@ -21,7 +21,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math;
import java.util.List;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.values.BeamRecord;
import org.apache.calcite.sql.type.SqlTypeName;
/**
@@ -38,7 +38,7 @@ public abstract class BeamSqlMathBinaryExpression extends BeamSqlExpression {
return numberOfOperands() == 2 && isOperandNumeric(opType(0)) && isOperandNumeric(opType(1));
}
- @Override public BeamSqlPrimitive<? extends Number> evaluate(BeamSqlRow inputRow) {
+ @Override public BeamSqlPrimitive<? extends Number> evaluate(BeamRecord inputRow) {
BeamSqlExpression leftOp = op(0);
BeamSqlExpression rightOp = op(1);
return calculate(leftOp.evaluate(inputRow), rightOp.evaluate(inputRow));
http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlMathUnaryExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlMathUnaryExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlMathUnaryExpression.java
index 163c40e..b225b8e 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlMathUnaryExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlMathUnaryExpression.java
@@ -21,7 +21,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math;
import java.util.List;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.values.BeamRecord;
import org.apache.calcite.sql.type.SqlTypeName;
@@ -45,7 +45,7 @@ public abstract class BeamSqlMathUnaryExpression extends BeamSqlExpression {
return acceptance;
}
- @Override public BeamSqlPrimitive<? extends Number> evaluate(BeamSqlRow inputRow) {
+ @Override public BeamSqlPrimitive<? extends Number> evaluate(BeamRecord inputRow) {
BeamSqlExpression operand = op(0);
return calculate(operand.evaluate(inputRow));
}
http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlPiExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlPiExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlPiExpression.java
index dfaf546..676f859 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlPiExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlPiExpression.java
@@ -20,7 +20,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.values.BeamRecord;
import org.apache.calcite.sql.type.SqlTypeName;
/**
@@ -36,7 +36,7 @@ public class BeamSqlPiExpression extends BeamSqlExpression {
return true;
}
- @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+ @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow) {
return BeamSqlPrimitive.of(SqlTypeName.DOUBLE, Math.PI);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlRandExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlRandExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlRandExpression.java
index f2d7a47..0575978 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlRandExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlRandExpression.java
@@ -22,7 +22,7 @@ import java.util.List;
import java.util.Random;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.values.BeamRecord;
import org.apache.calcite.sql.type.SqlTypeName;
/**
@@ -42,7 +42,7 @@ public class BeamSqlRandExpression extends BeamSqlExpression {
}
@Override
- public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) {
+ public BeamSqlPrimitive evaluate(BeamRecord inputRecord) {
if (operands.size() == 1) {
int rowSeed = opValueEvaluated(0, inputRecord);
if (seed == null || seed != rowSeed) {
http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlRandIntegerExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlRandIntegerExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlRandIntegerExpression.java
index b2e65ce..52f0cc1 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlRandIntegerExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlRandIntegerExpression.java
@@ -22,7 +22,7 @@ import java.util.List;
import java.util.Random;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.values.BeamRecord;
import org.apache.calcite.sql.type.SqlTypeName;
/**
@@ -43,7 +43,7 @@ public class BeamSqlRandIntegerExpression extends BeamSqlExpression {
}
@Override
- public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) {
+ public BeamSqlPrimitive evaluate(BeamRecord inputRecord) {
int numericIdx = 0;
if (operands.size() == 2) {
int rowSeed = opValueEvaluated(0, inputRecord);
http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlCharLengthExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlCharLengthExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlCharLengthExpression.java
index 580d747..974e2bc 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlCharLengthExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlCharLengthExpression.java
@@ -21,7 +21,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string;
import java.util.List;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.values.BeamRecord;
import org.apache.calcite.sql.type.SqlTypeName;
/**
@@ -32,7 +32,7 @@ public class BeamSqlCharLengthExpression extends BeamSqlStringUnaryExpression {
super(operands, SqlTypeName.INTEGER);
}
- @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+ @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow) {
String str = opValueEvaluated(0, inputRow);
return BeamSqlPrimitive.of(SqlTypeName.INTEGER, str.length());
}
http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlConcatExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlConcatExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlConcatExpression.java
index 772ad41..14ef55d 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlConcatExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlConcatExpression.java
@@ -21,7 +21,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string;
import java.util.List;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.values.BeamRecord;
import org.apache.calcite.sql.type.SqlTypeName;
/**
@@ -51,7 +51,7 @@ public class BeamSqlConcatExpression extends BeamSqlExpression {
return true;
}
- @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+ @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow) {
String left = opValueEvaluated(0, inputRow);
String right = opValueEvaluated(1, inputRow);
http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlInitCapExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlInitCapExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlInitCapExpression.java
index dc893e7..e50872b 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlInitCapExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlInitCapExpression.java
@@ -21,7 +21,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string;
import java.util.List;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.values.BeamRecord;
import org.apache.calcite.sql.type.SqlTypeName;
/**
@@ -32,7 +32,7 @@ public class BeamSqlInitCapExpression extends BeamSqlStringUnaryExpression {
super(operands, SqlTypeName.VARCHAR);
}
- @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+ @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow) {
String str = opValueEvaluated(0, inputRow);
StringBuilder ret = new StringBuilder(str);
http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlLowerExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlLowerExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlLowerExpression.java
index fd9d7aa..0f9a501 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlLowerExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlLowerExpression.java
@@ -21,7 +21,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string;
import java.util.List;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.values.BeamRecord;
import org.apache.calcite.sql.type.SqlTypeName;
/**
@@ -32,7 +32,7 @@ public class BeamSqlLowerExpression extends BeamSqlStringUnaryExpression {
super(operands, SqlTypeName.VARCHAR);
}
- @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+ @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow) {
String str = opValueEvaluated(0, inputRow);
return BeamSqlPrimitive.of(SqlTypeName.VARCHAR, str.toLowerCase());
}
http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlOverlayExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlOverlayExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlOverlayExpression.java
index 8d38efb..2336876 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlOverlayExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlOverlayExpression.java
@@ -21,7 +21,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string;
import java.util.List;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.values.BeamRecord;
import org.apache.calcite.sql.type.SqlTypeName;
/**
@@ -54,7 +54,7 @@ public class BeamSqlOverlayExpression extends BeamSqlExpression {
return true;
}
- @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+ @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow) {
String str = opValueEvaluated(0, inputRow);
String replaceStr = opValueEvaluated(1, inputRow);
int idx = opValueEvaluated(2, inputRow);