You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2017/08/04 17:09:45 UTC
[2/7] beam git commit: use BitSet for nullFields
use BitSet for nullFields
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/129ae969
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/129ae969
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/129ae969
Branch: refs/heads/DSL_SQL
Commit: 129ae9696af6a2f8d83ee962ca2ba8a7d6e3fd40
Parents: 52933a6
Author: mingmxu <mi...@ebay.com>
Authored: Thu Aug 3 00:43:46 2017 -0700
Committer: mingmxu <mi...@ebay.com>
Committed: Thu Aug 3 00:43:46 2017 -0700
----------------------------------------------------------------------
.../apache/beam/sdk/coders/BeamRecordCoder.java | 18 +++++++++++----
.../org/apache/beam/sdk/values/BeamRecord.java | 24 ++++++++++----------
.../extensions/sql/schema/BeamSqlRowCoder.java | 14 ++++++------
3 files changed, 32 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/129ae969/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BeamRecordCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BeamRecordCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BeamRecordCoder.java
index ad27f4e..27f92ce 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BeamRecordCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BeamRecordCoder.java
@@ -20,6 +20,7 @@ package org.apache.beam.sdk.coders;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.util.BitSet;
import java.util.List;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.values.BeamRecord;
@@ -30,23 +31,30 @@ import org.apache.beam.sdk.values.BeamRecordTypeProvider;
*/
@Experimental
public class BeamRecordCoder extends CustomCoder<BeamRecord> {
- private static final ListCoder<Integer> nullListCoder = ListCoder.of(BigEndianIntegerCoder.of());
+ private static final BitSetCoder nullListCoder = BitSetCoder.of();
private static final InstantCoder instantCoder = InstantCoder.of();
private BeamRecordTypeProvider recordType;
private List<Coder> coderArray;
- public BeamRecordCoder(BeamRecordTypeProvider recordType, List<Coder> coderArray) {
+ private BeamRecordCoder(BeamRecordTypeProvider recordType, List<Coder> coderArray) {
this.recordType = recordType;
this.coderArray = coderArray;
}
+ public static BeamRecordCoder of(BeamRecordTypeProvider recordType, List<Coder> coderArray){
+ if (recordType.size() != coderArray.size()) {
+ throw new IllegalArgumentException("Coder size doesn't match with field size");
+ }
+ return new BeamRecordCoder(recordType, coderArray);
+ }
+
@Override
public void encode(BeamRecord value, OutputStream outStream)
throws CoderException, IOException {
nullListCoder.encode(value.getNullFields(), outStream);
for (int idx = 0; idx < value.size(); ++idx) {
- if (value.getNullFields().contains(idx)) {
+ if (value.getNullFields().get(idx)) {
continue;
}
@@ -59,12 +67,12 @@ public class BeamRecordCoder extends CustomCoder<BeamRecord> {
@Override
public BeamRecord decode(InputStream inStream) throws CoderException, IOException {
- List<Integer> nullFields = nullListCoder.decode(inStream);
+ BitSet nullFields = nullListCoder.decode(inStream);
BeamRecord record = new BeamRecord(recordType);
record.setNullFields(nullFields);
for (int idx = 0; idx < recordType.size(); ++idx) {
- if (nullFields.contains(idx)) {
+ if (nullFields.get(idx)) {
continue;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/129ae969/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecord.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecord.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecord.java
index d1c1c17..476233e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecord.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecord.java
@@ -20,6 +20,7 @@ package org.apache.beam.sdk.values;
import java.io.Serializable;
import java.math.BigDecimal;
import java.util.ArrayList;
+import java.util.BitSet;
import java.util.Date;
import java.util.GregorianCalendar;
import java.util.List;
@@ -36,9 +37,9 @@ import org.joda.time.Instant;
*/
@Experimental
public class BeamRecord implements Serializable {
- //null values are indexed here, to handle properly in Coder.
- private List<Integer> nullFields = new ArrayList<>();
private List<Object> dataValues;
+ //null values are indexed here, to handle properly in Coder.
+ private BitSet nullFields;
private BeamRecordTypeProvider dataType;
private Instant windowStart = new Instant(TimeUnit.MICROSECONDS.toMillis(Long.MIN_VALUE));
@@ -46,10 +47,11 @@ public class BeamRecord implements Serializable {
public BeamRecord(BeamRecordTypeProvider dataType) {
this.dataType = dataType;
+ this.nullFields = new BitSet(dataType.size());
this.dataValues = new ArrayList<>();
for (int idx = 0; idx < dataType.size(); ++idx) {
dataValues.add(null);
- nullFields.add(idx);
+ nullFields.set(idx);
}
}
@@ -79,9 +81,7 @@ public class BeamRecord implements Serializable {
if (fieldValue == null) {
return;
} else {
- if (nullFields.contains(index)) {
- nullFields.remove(nullFields.indexOf(index));
- }
+ nullFields.clear(index);
}
dataType.validateValueType(index, fieldValue);
@@ -137,7 +137,7 @@ public class BeamRecord implements Serializable {
}
public Object getFieldValue(int fieldIdx) {
- if (nullFields.contains(fieldIdx)) {
+ if (nullFields.get(fieldIdx)) {
return null;
}
@@ -208,19 +208,19 @@ public class BeamRecord implements Serializable {
this.dataType = dataType;
}
- public void setNullFields(List<Integer> nullFields) {
- this.nullFields = nullFields;
+ public BitSet getNullFields() {
+ return nullFields;
}
- public List<Integer> getNullFields() {
- return nullFields;
+ public void setNullFields(BitSet nullFields) {
+ this.nullFields = nullFields;
}
/**
* is the specified field NULL?
*/
public boolean isNull(int idx) {
- return nullFields.contains(idx);
+ return nullFields.get(idx);
}
public Instant getWindowStart() {
http://git-wip-us.apache.org/repos/asf/beam/blob/129ae969/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRowCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRowCoder.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRowCoder.java
index 3d760c4..c7656af 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRowCoder.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRowCoder.java
@@ -21,19 +21,19 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.sql.Types;
+import java.util.BitSet;
import java.util.Date;
import java.util.GregorianCalendar;
-import java.util.List;
import org.apache.beam.sdk.coders.BigDecimalCoder;
import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
import org.apache.beam.sdk.coders.BigEndianLongCoder;
+import org.apache.beam.sdk.coders.BitSetCoder;
import org.apache.beam.sdk.coders.ByteCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.coders.DoubleCoder;
import org.apache.beam.sdk.coders.InstantCoder;
-import org.apache.beam.sdk.coders.ListCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
/**
@@ -42,7 +42,7 @@ import org.apache.beam.sdk.coders.StringUtf8Coder;
public class BeamSqlRowCoder extends CustomCoder<BeamSqlRow> {
private BeamSqlRowType sqlRecordType;
- private static final ListCoder<Integer> listCoder = ListCoder.of(BigEndianIntegerCoder.of());
+ private static final BitSetCoder nullListCoder = BitSetCoder.of();
private static final StringUtf8Coder stringCoder = StringUtf8Coder.of();
private static final BigEndianIntegerCoder intCoder = BigEndianIntegerCoder.of();
@@ -59,9 +59,9 @@ public class BeamSqlRowCoder extends CustomCoder<BeamSqlRow> {
@Override
public void encode(BeamSqlRow value, OutputStream outStream)
throws CoderException, IOException {
- listCoder.encode(value.getNullFields(), outStream);
+ nullListCoder.encode(value.getNullFields(), outStream);
for (int idx = 0; idx < value.size(); ++idx) {
- if (value.getNullFields().contains(idx)) {
+ if (value.getNullFields().get(idx)) {
continue;
}
@@ -114,12 +114,12 @@ public class BeamSqlRowCoder extends CustomCoder<BeamSqlRow> {
@Override
public BeamSqlRow decode(InputStream inStream) throws CoderException, IOException {
- List<Integer> nullFields = listCoder.decode(inStream);
+ BitSet nullFields = nullListCoder.decode(inStream);
BeamSqlRow record = new BeamSqlRow(sqlRecordType);
record.setNullFields(nullFields);
for (int idx = 0; idx < sqlRecordType.size(); ++idx) {
- if (nullFields.contains(idx)) {
+ if (nullFields.get(idx)) {
continue;
}