You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ta...@apache.org on 2017/08/10 20:38:49 UTC
[1/2] beam git commit: [BEAM-2744] rename BeamRecordType#size()
Repository: beam
Updated Branches:
refs/heads/DSL_SQL be01be5cb -> 2a1377e1c
[BEAM-2744] rename BeamRecordType#size()
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/affb8f6e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/affb8f6e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/affb8f6e
Branch: refs/heads/DSL_SQL
Commit: affb8f6e9f1fb3e4df79c787528b7af726100d29
Parents: be01be5
Author: James Xu <xu...@gmail.com>
Authored: Tue Aug 8 15:15:59 2017 +0800
Committer: Tyler Akidau <ta...@apache.org>
Committed: Thu Aug 10 13:06:15 2017 -0700
----------------------------------------------------------------------
.../apache/beam/sdk/coders/BeamRecordCoder.java | 12 +++++------
.../org/apache/beam/sdk/values/BeamRecord.java | 22 +++++---------------
.../apache/beam/sdk/values/BeamRecordType.java | 7 ++++++-
.../extensions/sql/impl/rel/BeamJoinRel.java | 2 +-
.../extensions/sql/impl/rel/BeamValuesRel.java | 2 +-
.../sql/impl/transform/BeamJoinTransforms.java | 4 ++--
.../extensions/sql/schema/BeamTableUtils.java | 10 ++++-----
.../sql/BeamSqlDslAggregationTest.java | 16 +++++++-------
.../beam/sdk/extensions/sql/TestUtils.java | 6 +++---
9 files changed, 38 insertions(+), 43 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/affb8f6e/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BeamRecordCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BeamRecordCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BeamRecordCoder.java
index 4e24b82..cbed87d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BeamRecordCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BeamRecordCoder.java
@@ -43,7 +43,7 @@ public class BeamRecordCoder extends CustomCoder<BeamRecord> {
}
public static BeamRecordCoder of(BeamRecordType recordType, List<Coder> coderArray){
- if (recordType.size() != coderArray.size()) {
+ if (recordType.getFieldCount() != coderArray.size()) {
throw new IllegalArgumentException("Coder size doesn't match with field size");
}
return new BeamRecordCoder(recordType, coderArray);
@@ -57,7 +57,7 @@ public class BeamRecordCoder extends CustomCoder<BeamRecord> {
public void encode(BeamRecord value, OutputStream outStream)
throws CoderException, IOException {
nullListCoder.encode(scanNullFields(value), outStream);
- for (int idx = 0; idx < value.size(); ++idx) {
+ for (int idx = 0; idx < value.getFieldCount(); ++idx) {
if (value.getFieldValue(idx) == null) {
continue;
}
@@ -70,8 +70,8 @@ public class BeamRecordCoder extends CustomCoder<BeamRecord> {
public BeamRecord decode(InputStream inStream) throws CoderException, IOException {
BitSet nullFields = nullListCoder.decode(inStream);
- List<Object> fieldValues = new ArrayList<>(recordType.size());
- for (int idx = 0; idx < recordType.size(); ++idx) {
+ List<Object> fieldValues = new ArrayList<>(recordType.getFieldCount());
+ for (int idx = 0; idx < recordType.getFieldCount(); ++idx) {
if (nullFields.get(idx)) {
fieldValues.add(null);
} else {
@@ -87,8 +87,8 @@ public class BeamRecordCoder extends CustomCoder<BeamRecord> {
* Scan {@link BeamRecord} to find fields with a NULL value.
*/
private BitSet scanNullFields(BeamRecord record){
- BitSet nullFields = new BitSet(record.size());
- for (int idx = 0; idx < record.size(); ++idx) {
+ BitSet nullFields = new BitSet(record.getFieldCount());
+ for (int idx = 0; idx < record.getFieldCount(); ++idx) {
if (record.getFieldValue(idx) == null) {
nullFields.set(idx);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/affb8f6e/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecord.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecord.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecord.java
index a3ede3c..fa3b574 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecord.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecord.java
@@ -52,13 +52,13 @@ public class BeamRecord implements Serializable {
}
this.dataType = dataType;
- this.dataValues = new ArrayList<>(dataType.size());
+ this.dataValues = new ArrayList<>(dataType.getFieldCount());
- for (int idx = 0; idx < dataType.size(); ++idx) {
+ for (int idx = 0; idx < dataType.getFieldCount(); ++idx) {
dataValues.add(null);
}
- for (int idx = 0; idx < dataType.size(); ++idx) {
+ for (int idx = 0; idx < dataType.getFieldCount(); ++idx) {
addField(idx, rawDataValues.get(idx));
}
}
@@ -168,7 +168,7 @@ public class BeamRecord implements Serializable {
return (Boolean) getFieldValue(idx);
}
- public int size() {
+ public int getFieldCount() {
return dataValues.size();
}
@@ -182,19 +182,7 @@ public class BeamRecord implements Serializable {
@Override
public String toString() {
- return "BeamSqlRow [dataValues=" + dataValues + ", dataType=" + dataType + "]";
- }
-
- /**
- * Return data fields as key=value.
- */
- public String valueInString() {
- StringBuilder sb = new StringBuilder();
- for (int idx = 0; idx < size(); ++idx) {
- sb.append(
- String.format(",%s=%s", getDataType().getFieldNames().get(idx), getFieldValue(idx)));
- }
- return sb.substring(1);
+ return "BeamRecord [dataValues=" + dataValues + ", dataType=" + dataType + "]";
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/affb8f6e/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecordType.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecordType.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecordType.java
index 6ab783c..29cc80d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecordType.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecordType.java
@@ -65,7 +65,12 @@ public class BeamRecordType implements Serializable{
return fieldNames.indexOf(fieldName);
}
- public int size(){
+ public int getFieldCount(){
return fieldNames.size();
}
+
+ @Override
+ public String toString() {
+ return "BeamRecordType [fieldsName=" + fieldNames + "]";
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/affb8f6e/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java
index 9dceb25..5ac9575 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java
@@ -256,7 +256,7 @@ public class BeamJoinRel extends Join implements BeamRelNode {
private BeamRecord buildNullRow(BeamRelNode relNode) {
BeamRecordSqlType leftType = CalciteUtils.toBeamRowType(relNode.getRowType());
- return new BeamRecord(leftType, Collections.nCopies(leftType.size(), null));
+ return new BeamRecord(leftType, Collections.nCopies(leftType.getFieldCount(), null));
}
private List<Pair<Integer, Integer>> extractJoinColumns(int leftRowColumnCount) {
http://git-wip-us.apache.org/repos/asf/beam/blob/affb8f6e/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRel.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRel.java
index fde002e..c4caff3 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRel.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRel.java
@@ -65,7 +65,7 @@ public class BeamValuesRel extends Values implements BeamRelNode {
BeamRecordSqlType beamSQLRowType = CalciteUtils.toBeamRowType(this.getRowType());
for (ImmutableList<RexLiteral> tuple : tuples) {
- List<Object> fieldsValue = new ArrayList<>(beamSQLRowType.size());
+ List<Object> fieldsValue = new ArrayList<>(beamSQLRowType.getFieldCount());
for (int i = 0; i < tuple.size(); i++) {
fieldsValue.add(BeamTableUtils.autoCastField(
beamSQLRowType.getFieldTypeByIndex(i), tuple.get(i).getValue()));
http://git-wip-us.apache.org/repos/asf/beam/blob/affb8f6e/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamJoinTransforms.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamJoinTransforms.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamJoinTransforms.java
index 9a48c53..7a8d10d 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamJoinTransforms.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamJoinTransforms.java
@@ -145,11 +145,11 @@ public class BeamJoinTransforms {
private static BeamRecord combineTwoRowsIntoOneHelper(BeamRecord leftRow,
BeamRecord rightRow) {
// build the type
- List<String> names = new ArrayList<>(leftRow.size() + rightRow.size());
+ List<String> names = new ArrayList<>(leftRow.getFieldCount() + rightRow.getFieldCount());
names.addAll(leftRow.getDataType().getFieldNames());
names.addAll(rightRow.getDataType().getFieldNames());
- List<Integer> types = new ArrayList<>(leftRow.size() + rightRow.size());
+ List<Integer> types = new ArrayList<>(leftRow.getFieldCount() + rightRow.getFieldCount());
types.addAll(BeamSqlRecordHelper.getSqlRecordType(leftRow).getFieldTypes());
types.addAll(BeamSqlRecordHelper.getSqlRecordType(rightRow).getFieldTypes());
BeamRecordSqlType type = BeamRecordSqlType.create(names, types);
http://git-wip-us.apache.org/repos/asf/beam/blob/affb8f6e/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamTableUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamTableUtils.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamTableUtils.java
index 99f9522..687a082 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamTableUtils.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamTableUtils.java
@@ -41,18 +41,18 @@ public final class BeamTableUtils {
CSVFormat csvFormat,
String line,
BeamRecordSqlType beamRecordSqlType) {
- List<Object> fieldsValue = new ArrayList<>(beamRecordSqlType.size());
+ List<Object> fieldsValue = new ArrayList<>(beamRecordSqlType.getFieldCount());
try (StringReader reader = new StringReader(line)) {
CSVParser parser = csvFormat.parse(reader);
CSVRecord rawRecord = parser.getRecords().get(0);
- if (rawRecord.size() != beamRecordSqlType.size()) {
+ if (rawRecord.size() != beamRecordSqlType.getFieldCount()) {
throw new IllegalArgumentException(String.format(
"Expect %d fields, but actually %d",
- beamRecordSqlType.size(), rawRecord.size()
+ beamRecordSqlType.getFieldCount(), rawRecord.size()
));
} else {
- for (int idx = 0; idx < beamRecordSqlType.size(); idx++) {
+ for (int idx = 0; idx < beamRecordSqlType.getFieldCount(); idx++) {
String raw = rawRecord.get(idx);
fieldsValue.add(autoCastField(beamRecordSqlType.getFieldTypeByIndex(idx), raw));
}
@@ -66,7 +66,7 @@ public final class BeamTableUtils {
public static String beamSqlRow2CsvLine(BeamRecord row, CSVFormat csvFormat) {
StringWriter writer = new StringWriter();
try (CSVPrinter printer = csvFormat.print(writer)) {
- for (int i = 0; i < row.size(); i++) {
+ for (int i = 0; i < row.getFieldCount(); i++) {
printer.print(row.getFieldValue(i).toString());
}
printer.println();
http://git-wip-us.apache.org/repos/asf/beam/blob/affb8f6e/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java
index 4e74dbb..db562da 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java
@@ -49,7 +49,7 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase {
}
private void runAggregationWithoutWindow(PCollection<BeamRecord> input) throws Exception {
- String sql = "SELECT f_int2, COUNT(*) AS `size` FROM PCOLLECTION GROUP BY f_int2";
+ String sql = "SELECT f_int2, COUNT(*) AS `getFieldCount` FROM PCOLLECTION GROUP BY f_int2";
PCollection<BeamRecord> result =
input.apply("testAggregationWithoutWindow", BeamSql.simpleQuery(sql));
@@ -57,6 +57,7 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase {
BeamRecordSqlType resultType = BeamRecordSqlType.create(Arrays.asList("f_int2", "size"),
Arrays.asList(Types.INTEGER, Types.BIGINT));
+
BeamRecord record = new BeamRecord(resultType, 0, 4L);
PAssert.that(result).containsInAnyOrder(record);
@@ -81,7 +82,7 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase {
}
private void runAggregationFunctions(PCollection<BeamRecord> input) throws Exception{
- String sql = "select f_int2, count(*) as size, "
+ String sql = "select f_int2, count(*) as getFieldCount, "
+ "sum(f_long) as sum1, avg(f_long) as avg1, max(f_long) as max1, min(f_long) as min1,"
+ "sum(f_short) as sum2, avg(f_short) as avg2, max(f_short) as max2, min(f_short) as min2,"
+ "sum(f_byte) as sum3, avg(f_byte) as avg3, max(f_byte) as max3, min(f_byte) as min3,"
@@ -171,7 +172,7 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase {
}
private void runTumbleWindow(PCollection<BeamRecord> input) throws Exception {
- String sql = "SELECT f_int2, COUNT(*) AS `size`,"
+ String sql = "SELECT f_int2, COUNT(*) AS `getFieldCount`,"
+ " TUMBLE_START(f_timestamp, INTERVAL '1' HOUR) AS `window_start`"
+ " FROM TABLE_A"
+ " GROUP BY f_int2, TUMBLE(f_timestamp, INTERVAL '1' HOUR)";
@@ -208,7 +209,7 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase {
}
private void runHopWindow(PCollection<BeamRecord> input) throws Exception {
- String sql = "SELECT f_int2, COUNT(*) AS `size`,"
+ String sql = "SELECT f_int2, COUNT(*) AS `getFieldCount`,"
+ " HOP_START(f_timestamp, INTERVAL '1' HOUR, INTERVAL '30' MINUTE) AS `window_start`"
+ " FROM PCOLLECTION"
+ " GROUP BY f_int2, HOP(f_timestamp, INTERVAL '1' HOUR, INTERVAL '30' MINUTE)";
@@ -246,7 +247,7 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase {
}
private void runSessionWindow(PCollection<BeamRecord> input) throws Exception {
- String sql = "SELECT f_int2, COUNT(*) AS `size`,"
+ String sql = "SELECT f_int2, COUNT(*) AS `getFieldCount`,"
+ " SESSION_START(f_timestamp, INTERVAL '5' MINUTE) AS `window_start`"
+ " FROM TABLE_A"
+ " GROUP BY f_int2, SESSION(f_timestamp, INTERVAL '5' MINUTE)";
@@ -273,7 +274,7 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase {
"Cannot apply 'TUMBLE' to arguments of type 'TUMBLE(<BIGINT>, <INTERVAL HOUR>)'");
pipeline.enableAbandonedNodeEnforcement(false);
- String sql = "SELECT f_int2, COUNT(*) AS `size` FROM TABLE_A "
+ String sql = "SELECT f_int2, COUNT(*) AS `getFieldCount` FROM TABLE_A "
+ "GROUP BY f_int2, TUMBLE(f_long, INTERVAL '1' HOUR)";
PCollection<BeamRecord> result =
PCollectionTuple.of(new TupleTag<BeamRecord>("TABLE_A"), boundedInput1)
@@ -288,7 +289,8 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase {
exceptions.expectMessage("Encountered \"*\"");
pipeline.enableAbandonedNodeEnforcement(false);
- String sql = "SELECT f_int2, COUNT(DISTINCT *) AS `size` FROM PCOLLECTION GROUP BY f_int2";
+ String sql = "SELECT f_int2, COUNT(DISTINCT *) AS `size` "
+ + "FROM PCOLLECTION GROUP BY f_int2";
PCollection<BeamRecord> result =
boundedInput1.apply("testUnsupportedDistinct", BeamSql.simpleQuery(sql));
http://git-wip-us.apache.org/repos/asf/beam/blob/affb8f6e/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/TestUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/TestUtils.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/TestUtils.java
index aa1fc29..373deb7 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/TestUtils.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/TestUtils.java
@@ -35,7 +35,7 @@ public class TestUtils {
public static class BeamSqlRow2StringDoFn extends DoFn<BeamRecord, String> {
@ProcessElement
public void processElement(ProcessContext ctx) {
- ctx.output(ctx.element().valueInString());
+ ctx.output(ctx.element().toString());
}
}
@@ -45,7 +45,7 @@ public class TestUtils {
public static List<String> beamSqlRows2Strings(List<BeamRecord> rows) {
List<String> strs = new ArrayList<>();
for (BeamRecord row : rows) {
- strs.add(row.valueInString());
+ strs.add(row.toString());
}
return strs;
@@ -181,7 +181,7 @@ public class TestUtils {
*/
public static List<BeamRecord> buildRows(BeamRecordSqlType type, List args) {
List<BeamRecord> rows = new ArrayList<>();
- int fieldCount = type.size();
+ int fieldCount = type.getFieldCount();
for (int i = 0; i < args.size(); i += fieldCount) {
rows.add(new BeamRecord(type, args.subList(i, i + fieldCount)));
[2/2] beam git commit: [BEAM-2744] This closes #3702
Posted by ta...@apache.org.
[BEAM-2744] This closes #3702
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2a1377e1
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2a1377e1
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2a1377e1
Branch: refs/heads/DSL_SQL
Commit: 2a1377e1c3aae2234f20b38c2203a2d8d7c78f6b
Parents: be01be5 affb8f6
Author: Tyler Akidau <ta...@apache.org>
Authored: Thu Aug 10 13:23:37 2017 -0700
Committer: Tyler Akidau <ta...@apache.org>
Committed: Thu Aug 10 13:23:37 2017 -0700
----------------------------------------------------------------------
.../apache/beam/sdk/coders/BeamRecordCoder.java | 12 +++++------
.../org/apache/beam/sdk/values/BeamRecord.java | 22 +++++---------------
.../apache/beam/sdk/values/BeamRecordType.java | 7 ++++++-
.../extensions/sql/impl/rel/BeamJoinRel.java | 2 +-
.../extensions/sql/impl/rel/BeamValuesRel.java | 2 +-
.../sql/impl/transform/BeamJoinTransforms.java | 4 ++--
.../extensions/sql/schema/BeamTableUtils.java | 10 ++++-----
.../sql/BeamSqlDslAggregationTest.java | 16 +++++++-------
.../beam/sdk/extensions/sql/TestUtils.java | 6 +++---
9 files changed, 38 insertions(+), 43 deletions(-)
----------------------------------------------------------------------