You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ta...@apache.org on 2017/08/10 21:00:35 UTC
[1/2] beam git commit: update JavaDoc for BeamRecord,
BeamRecordType. Also only create new UDF class instances for
SerializableFunction UDFs.
Repository: beam
Updated Branches:
refs/heads/DSL_SQL 2a1377e1c -> f37a7a19c
update JavaDoc for BeamRecord, BeamRecordType.
Also only create new UDF class instances for SerializableFunction UDFs.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/cd27036a
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/cd27036a
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/cd27036a
Branch: refs/heads/DSL_SQL
Commit: cd27036a1c537a9059f13bbcdaec0264481ebd0b
Parents: 2a1377e
Author: mingmxu <mi...@ebay.com>
Authored: Wed Aug 9 13:30:16 2017 -0700
Committer: Tyler Akidau <ta...@apache.org>
Committed: Thu Aug 10 13:58:59 2017 -0700
----------------------------------------------------------------------
.../org/apache/beam/sdk/values/BeamRecord.java | 118 ++++++++++++++++++-
.../apache/beam/sdk/values/BeamRecordType.java | 60 ++++++----
.../operator/BeamSqlUdfExpression.java | 5 +-
3 files changed, 159 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/cd27036a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecord.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecord.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecord.java
index fa3b574..fd26f46 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecord.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecord.java
@@ -25,11 +25,17 @@ import java.util.Date;
import java.util.GregorianCalendar;
import java.util.List;
import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.BeamRecordCoder;
/**
- * {@link org.apache.beam.sdk.values.BeamRecord}, self-described with
- * {@link BeamRecordType}, represents one element in a
- * {@link org.apache.beam.sdk.values.PCollection}.
+ * {@link BeamRecord} is an immutable tuple-like type to represent one element in a
+ * {@link PCollection}. The fields are described with a {@link BeamRecordType}.
+ *
+ * <p>By default, {@link BeamRecordType} only contains the name for each field. It
+ * can be extended to support more sophisticated validation by overwriting
+ * {@link BeamRecordType#validateValueType(int, Object)}.
+ *
+ * <p>A Coder {@link BeamRecordCoder} is provided, which wraps the Coder for each data field.
*/
@Experimental
public class BeamRecord implements Serializable {
@@ -63,6 +69,9 @@ public class BeamRecord implements Serializable {
}
}
+ /**
+ * see {@link #BeamRecord(BeamRecordType, List)}.
+ */
public BeamRecord(BeamRecordType dataType, Object... rawdataValues) {
this(dataType, Arrays.asList(rawdataValues));
}
@@ -72,110 +81,213 @@ public class BeamRecord implements Serializable {
dataValues.set(index, fieldValue);
}
+ /**
+ * Get value by field name.
+ */
public Object getFieldValue(String fieldName) {
return getFieldValue(dataType.getFieldNames().indexOf(fieldName));
}
+ /**
+ * Get a {@link Byte} value by field name, {@link ClassCastException} is thrown
+ * if type doesn't match.
+ */
public Byte getByte(String fieldName) {
return (Byte) getFieldValue(fieldName);
}
+ /**
+ * Get a {@link Short} value by field name, {@link ClassCastException} is thrown
+ * if type doesn't match.
+ */
public Short getShort(String fieldName) {
return (Short) getFieldValue(fieldName);
}
+ /**
+ * Get a {@link Integer} value by field name, {@link ClassCastException} is thrown
+ * if type doesn't match.
+ */
public Integer getInteger(String fieldName) {
return (Integer) getFieldValue(fieldName);
}
+ /**
+ * Get a {@link Float} value by field name, {@link ClassCastException} is thrown
+ * if type doesn't match.
+ */
public Float getFloat(String fieldName) {
return (Float) getFieldValue(fieldName);
}
+ /**
+ * Get a {@link Double} value by field name, {@link ClassCastException} is thrown
+ * if type doesn't match.
+ */
public Double getDouble(String fieldName) {
return (Double) getFieldValue(fieldName);
}
+ /**
+ * Get a {@link Long} value by field name, {@link ClassCastException} is thrown
+ * if type doesn't match.
+ */
public Long getLong(String fieldName) {
return (Long) getFieldValue(fieldName);
}
+ /**
+ * Get a {@link String} value by field name, {@link ClassCastException} is thrown
+ * if type doesn't match.
+ */
public String getString(String fieldName) {
return (String) getFieldValue(fieldName);
}
+ /**
+ * Get a {@link Date} value by field name, {@link ClassCastException} is thrown
+ * if type doesn't match.
+ */
public Date getDate(String fieldName) {
return (Date) getFieldValue(fieldName);
}
+ /**
+ * Get a {@link GregorianCalendar} value by field name, {@link ClassCastException} is thrown
+ * if type doesn't match.
+ */
public GregorianCalendar getGregorianCalendar(String fieldName) {
return (GregorianCalendar) getFieldValue(fieldName);
}
+ /**
+ * Get a {@link BigDecimal} value by field name, {@link ClassCastException} is thrown
+ * if type doesn't match.
+ */
public BigDecimal getBigDecimal(String fieldName) {
return (BigDecimal) getFieldValue(fieldName);
}
+ /**
+ * Get a {@link Boolean} value by field name, {@link ClassCastException} is thrown
+ * if type doesn't match.
+ */
public Boolean getBoolean(String fieldName) {
return (Boolean) getFieldValue(fieldName);
}
+ /**
+ * Get value by field index.
+ */
public Object getFieldValue(int fieldIdx) {
return dataValues.get(fieldIdx);
}
+ /**
+ * Get a {@link Byte} value by field index, {@link ClassCastException} is thrown
+ * if type doesn't match.
+ */
public Byte getByte(int idx) {
return (Byte) getFieldValue(idx);
}
+ /**
+ * Get a {@link Short} value by field index, {@link ClassCastException} is thrown
+ * if type doesn't match.
+ */
public Short getShort(int idx) {
return (Short) getFieldValue(idx);
}
+ /**
+ * Get a {@link Integer} value by field index, {@link ClassCastException} is thrown
+ * if type doesn't match.
+ */
public Integer getInteger(int idx) {
return (Integer) getFieldValue(idx);
}
+ /**
+ * Get a {@link Float} value by field index, {@link ClassCastException} is thrown
+ * if type doesn't match.
+ */
public Float getFloat(int idx) {
return (Float) getFieldValue(idx);
}
+ /**
+ * Get a {@link Double} value by field index, {@link ClassCastException} is thrown
+ * if type doesn't match.
+ */
public Double getDouble(int idx) {
return (Double) getFieldValue(idx);
}
+ /**
+ * Get a {@link Long} value by field index, {@link ClassCastException} is thrown
+ * if type doesn't match.
+ */
public Long getLong(int idx) {
return (Long) getFieldValue(idx);
}
+ /**
+ * Get a {@link String} value by field index, {@link ClassCastException} is thrown
+ * if type doesn't match.
+ */
public String getString(int idx) {
return (String) getFieldValue(idx);
}
+ /**
+ * Get a {@link Date} value by field index, {@link ClassCastException} is thrown
+ * if type doesn't match.
+ */
public Date getDate(int idx) {
return (Date) getFieldValue(idx);
}
+ /**
+ * Get a {@link GregorianCalendar} value by field index, {@link ClassCastException} is thrown
+ * if type doesn't match.
+ */
public GregorianCalendar getGregorianCalendar(int idx) {
return (GregorianCalendar) getFieldValue(idx);
}
+ /**
+ * Get a {@link BigDecimal} value by field index, {@link ClassCastException} is thrown
+ * if type doesn't match.
+ */
public BigDecimal getBigDecimal(int idx) {
return (BigDecimal) getFieldValue(idx);
}
+ /**
+ * Get a {@link Boolean} value by field index, {@link ClassCastException} is thrown
+ * if type doesn't match.
+ */
public Boolean getBoolean(int idx) {
return (Boolean) getFieldValue(idx);
}
+ /**
+ * Return the size of data fields.
+ */
public int getFieldCount() {
return dataValues.size();
}
+ /**
+ * Return the list of data values.
+ */
public List<Object> getDataValues() {
return dataValues;
}
+ /**
+ * Return {@link BeamRecordType} which describes the fields.
+ */
public BeamRecordType getDataType() {
return dataType;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/cd27036a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecordType.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecordType.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecordType.java
index 29cc80d..620361c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecordType.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecordType.java
@@ -25,14 +25,22 @@ import org.apache.beam.sdk.coders.BeamRecordCoder;
import org.apache.beam.sdk.coders.Coder;
/**
- * The default type provider used in {@link BeamRecord}.
+ * {@link BeamRecordType} describes the fields in {@link BeamRecord}, extra checking can be added
+ * by overwriting {@link BeamRecordType#validateValueType(int, Object)}.
*/
@Experimental
public class BeamRecordType implements Serializable{
private List<String> fieldNames;
private List<Coder> fieldCoders;
+ /**
+ * Create a {@link BeamRecordType} with a name and Coder for each field.
+ */
public BeamRecordType(List<String> fieldNames, List<Coder> fieldCoders) {
+ if (fieldNames.size() != fieldCoders.size()) {
+ throw new IllegalStateException(
+ "the size of fieldNames and fieldCoders need to be the same.");
+ }
this.fieldNames = fieldNames;
this.fieldCoders = fieldCoders;
}
@@ -41,30 +49,42 @@ public class BeamRecordType implements Serializable{
* Validate input fieldValue for a field.
* @throws IllegalArgumentException throw exception when the validation fails.
*/
- public void validateValueType(int index, Object fieldValue)
- throws IllegalArgumentException{
- //do nothing by default.
- }
+ public void validateValueType(int index, Object fieldValue)
+ throws IllegalArgumentException{
+ //do nothing by default.
+ }
- /**
- * Get the coder for {@link BeamRecordCoder}.
- */
- public BeamRecordCoder getRecordCoder(){
- return BeamRecordCoder.of(this, fieldCoders);
- }
+ /**
+ * Return the coder for {@link BeamRecord}, which wraps {@link #fieldCoders} for each field.
+ */
+ public BeamRecordCoder getRecordCoder(){
+ return BeamRecordCoder.of(this, fieldCoders);
+ }
- public List<String> getFieldNames(){
- return ImmutableList.copyOf(fieldNames);
- }
+ /**
+ * Returns an immutable list of field names.
+ */
+ public List<String> getFieldNames(){
+ return ImmutableList.copyOf(fieldNames);
+ }
- public String getFieldNameByIndex(int index){
- return fieldNames.get(index);
- }
+ /**
+ * Return the name of field by index.
+ */
+ public String getFieldNameByIndex(int index){
+ return fieldNames.get(index);
+ }
- public int findIndexOfField(String fieldName){
- return fieldNames.indexOf(fieldName);
- }
+ /**
+ * Find the index of a given field.
+ */
+ public int findIndexOfField(String fieldName){
+ return fieldNames.indexOf(fieldName);
+ }
+ /**
+ * Return the count of fields.
+ */
public int getFieldCount(){
return fieldNames.size();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/cd27036a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlUdfExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlUdfExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlUdfExpression.java
index 123e6a0..625de2c 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlUdfExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlUdfExpression.java
@@ -18,6 +18,7 @@
package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator;
import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
import java.util.ArrayList;
import java.util.List;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -79,8 +80,10 @@ public class BeamSqlUdfExpression extends BeamSqlExpression {
for (String pc : paraClassName) {
paraClass.add(Class.forName(pc));
}
- udfIns = Class.forName(className).newInstance();
method = Class.forName(className).getMethod(methodName, paraClass.toArray(new Class<?>[] {}));
+ if (!Modifier.isStatic(method.getModifiers())) {
+ udfIns = Class.forName(className).newInstance();
+ }
} catch (Exception e) {
throw new RuntimeException(e);
}
[2/2] beam git commit: [BEAM-2741] This closes #3710
Posted by ta...@apache.org.
[BEAM-2741] This closes #3710
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f37a7a19
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f37a7a19
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f37a7a19
Branch: refs/heads/DSL_SQL
Commit: f37a7a19c35cba03225694af7491a89dbfe06de3
Parents: 2a1377e cd27036
Author: Tyler Akidau <ta...@apache.org>
Authored: Thu Aug 10 13:59:45 2017 -0700
Committer: Tyler Akidau <ta...@apache.org>
Committed: Thu Aug 10 13:59:45 2017 -0700
----------------------------------------------------------------------
.../org/apache/beam/sdk/values/BeamRecord.java | 118 ++++++++++++++++++-
.../apache/beam/sdk/values/BeamRecordType.java | 60 ++++++----
.../operator/BeamSqlUdfExpression.java | 5 +-
3 files changed, 159 insertions(+), 24 deletions(-)
----------------------------------------------------------------------