You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2017/08/04 17:09:45 UTC

[2/7] beam git commit: use BitSet for nullFields

use BitSet for nullFields


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/129ae969
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/129ae969
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/129ae969

Branch: refs/heads/DSL_SQL
Commit: 129ae9696af6a2f8d83ee962ca2ba8a7d6e3fd40
Parents: 52933a6
Author: mingmxu <mi...@ebay.com>
Authored: Thu Aug 3 00:43:46 2017 -0700
Committer: mingmxu <mi...@ebay.com>
Committed: Thu Aug 3 00:43:46 2017 -0700

----------------------------------------------------------------------
 .../apache/beam/sdk/coders/BeamRecordCoder.java | 18 +++++++++++----
 .../org/apache/beam/sdk/values/BeamRecord.java  | 24 ++++++++++----------
 .../extensions/sql/schema/BeamSqlRowCoder.java  | 14 ++++++------
 3 files changed, 32 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/129ae969/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BeamRecordCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BeamRecordCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BeamRecordCoder.java
index ad27f4e..27f92ce 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BeamRecordCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BeamRecordCoder.java
@@ -20,6 +20,7 @@ package org.apache.beam.sdk.coders;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.util.BitSet;
 import java.util.List;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.values.BeamRecord;
@@ -30,23 +31,30 @@ import org.apache.beam.sdk.values.BeamRecordTypeProvider;
  */
 @Experimental
 public class BeamRecordCoder extends CustomCoder<BeamRecord> {
-  private static final ListCoder<Integer> nullListCoder = ListCoder.of(BigEndianIntegerCoder.of());
+  private static final BitSetCoder nullListCoder = BitSetCoder.of();
   private static final InstantCoder instantCoder = InstantCoder.of();
 
   private BeamRecordTypeProvider recordType;
   private List<Coder> coderArray;
 
-  public BeamRecordCoder(BeamRecordTypeProvider recordType, List<Coder> coderArray) {
+  private BeamRecordCoder(BeamRecordTypeProvider recordType, List<Coder> coderArray) {
     this.recordType = recordType;
     this.coderArray = coderArray;
   }
 
+  public static BeamRecordCoder of(BeamRecordTypeProvider recordType, List<Coder> coderArray){
+    if (recordType.size() != coderArray.size()) {
+      throw new IllegalArgumentException("Coder size doesn't match with field size");
+    }
+    return new BeamRecordCoder(recordType, coderArray);
+  }
+
   @Override
   public void encode(BeamRecord value, OutputStream outStream)
       throws CoderException, IOException {
     nullListCoder.encode(value.getNullFields(), outStream);
     for (int idx = 0; idx < value.size(); ++idx) {
-      if (value.getNullFields().contains(idx)) {
+      if (value.getNullFields().get(idx)) {
         continue;
       }
 
@@ -59,12 +67,12 @@ public class BeamRecordCoder extends CustomCoder<BeamRecord> {
 
   @Override
   public BeamRecord decode(InputStream inStream) throws CoderException, IOException {
-    List<Integer> nullFields = nullListCoder.decode(inStream);
+    BitSet nullFields = nullListCoder.decode(inStream);
 
     BeamRecord record = new BeamRecord(recordType);
     record.setNullFields(nullFields);
     for (int idx = 0; idx < recordType.size(); ++idx) {
-      if (nullFields.contains(idx)) {
+      if (nullFields.get(idx)) {
         continue;
       }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/129ae969/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecord.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecord.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecord.java
index d1c1c17..476233e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecord.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecord.java
@@ -20,6 +20,7 @@ package org.apache.beam.sdk.values;
 import java.io.Serializable;
 import java.math.BigDecimal;
 import java.util.ArrayList;
+import java.util.BitSet;
 import java.util.Date;
 import java.util.GregorianCalendar;
 import java.util.List;
@@ -36,9 +37,9 @@ import org.joda.time.Instant;
  */
 @Experimental
 public class BeamRecord implements Serializable {
-  //null values are indexed here, to handle properly in Coder.
-  private List<Integer> nullFields = new ArrayList<>();
   private List<Object> dataValues;
+  //null values are indexed here, to handle properly in Coder.
+  private BitSet nullFields;
   private BeamRecordTypeProvider dataType;
 
   private Instant windowStart = new Instant(TimeUnit.MICROSECONDS.toMillis(Long.MIN_VALUE));
@@ -46,10 +47,11 @@ public class BeamRecord implements Serializable {
 
   public BeamRecord(BeamRecordTypeProvider dataType) {
     this.dataType = dataType;
+    this.nullFields = new BitSet(dataType.size());
     this.dataValues = new ArrayList<>();
     for (int idx = 0; idx < dataType.size(); ++idx) {
       dataValues.add(null);
-      nullFields.add(idx);
+      nullFields.set(idx);
     }
   }
 
@@ -79,9 +81,7 @@ public class BeamRecord implements Serializable {
     if (fieldValue == null) {
       return;
     } else {
-      if (nullFields.contains(index)) {
-        nullFields.remove(nullFields.indexOf(index));
-      }
+      nullFields.clear(index);
     }
 
     dataType.validateValueType(index, fieldValue);
@@ -137,7 +137,7 @@ public class BeamRecord implements Serializable {
   }
 
   public Object getFieldValue(int fieldIdx) {
-    if (nullFields.contains(fieldIdx)) {
+    if (nullFields.get(fieldIdx)) {
       return null;
     }
 
@@ -208,19 +208,19 @@ public class BeamRecord implements Serializable {
     this.dataType = dataType;
   }
 
-  public void setNullFields(List<Integer> nullFields) {
-    this.nullFields = nullFields;
+  public BitSet getNullFields() {
+    return nullFields;
   }
 
-  public List<Integer> getNullFields() {
-    return nullFields;
+  public void setNullFields(BitSet nullFields) {
+    this.nullFields = nullFields;
   }
 
   /**
    * is the specified field NULL?
    */
   public boolean isNull(int idx) {
-    return nullFields.contains(idx);
+    return nullFields.get(idx);
   }
 
   public Instant getWindowStart() {

http://git-wip-us.apache.org/repos/asf/beam/blob/129ae969/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRowCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRowCoder.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRowCoder.java
index 3d760c4..c7656af 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRowCoder.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRowCoder.java
@@ -21,19 +21,19 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.sql.Types;
+import java.util.BitSet;
 import java.util.Date;
 import java.util.GregorianCalendar;
-import java.util.List;
 import org.apache.beam.sdk.coders.BigDecimalCoder;
 import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
 import org.apache.beam.sdk.coders.BigEndianLongCoder;
+import org.apache.beam.sdk.coders.BitSetCoder;
 import org.apache.beam.sdk.coders.ByteCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.DoubleCoder;
 import org.apache.beam.sdk.coders.InstantCoder;
-import org.apache.beam.sdk.coders.ListCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 
 /**
@@ -42,7 +42,7 @@ import org.apache.beam.sdk.coders.StringUtf8Coder;
 public class BeamSqlRowCoder extends CustomCoder<BeamSqlRow> {
   private BeamSqlRowType sqlRecordType;
 
-  private static final ListCoder<Integer> listCoder = ListCoder.of(BigEndianIntegerCoder.of());
+  private static final BitSetCoder nullListCoder = BitSetCoder.of();
 
   private static final StringUtf8Coder stringCoder = StringUtf8Coder.of();
   private static final BigEndianIntegerCoder intCoder = BigEndianIntegerCoder.of();
@@ -59,9 +59,9 @@ public class BeamSqlRowCoder extends CustomCoder<BeamSqlRow> {
   @Override
   public void encode(BeamSqlRow value, OutputStream outStream)
       throws CoderException, IOException {
-    listCoder.encode(value.getNullFields(), outStream);
+    nullListCoder.encode(value.getNullFields(), outStream);
     for (int idx = 0; idx < value.size(); ++idx) {
-      if (value.getNullFields().contains(idx)) {
+      if (value.getNullFields().get(idx)) {
         continue;
       }
 
@@ -114,12 +114,12 @@ public class BeamSqlRowCoder extends CustomCoder<BeamSqlRow> {
 
   @Override
   public BeamSqlRow decode(InputStream inStream) throws CoderException, IOException {
-    List<Integer> nullFields = listCoder.decode(inStream);
+    BitSet nullFields = nullListCoder.decode(inStream);
 
     BeamSqlRow record = new BeamSqlRow(sqlRecordType);
     record.setNullFields(nullFields);
     for (int idx = 0; idx < sqlRecordType.size(); ++idx) {
-      if (nullFields.contains(idx)) {
+      if (nullFields.get(idx)) {
         continue;
       }