You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2017/04/27 00:23:56 UTC

[2/3] beam git commit: [BEAM-1871] Clean-up org.apache.beam.sdk.util, move BitSetCoder from util to coder

[BEAM-1871] Clean-up org.apache.beam.sdk.util, move BitSetCoder from util to coder


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/11a20ffb
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/11a20ffb
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/11a20ffb

Branch: refs/heads/master
Commit: 11a20ffb8e1f4df4028d16551864bf36027d47a2
Parents: a32371e
Author: Luke Cwik <lc...@google.com>
Authored: Wed Apr 26 13:38:37 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Wed Apr 26 17:23:19 2017 -0700

----------------------------------------------------------------------
 .../triggers/TriggerStateMachineRunner.java     |   2 +-
 .../org/apache/beam/sdk/coders/BitSetCoder.java |  63 ++++++++++++
 .../apache/beam/sdk/coders/CoderRegistry.java   |   2 +
 .../org/apache/beam/sdk/util/BitSetCoder.java   |  59 -----------
 .../apache/beam/sdk/coders/BitSetCoderTest.java | 103 +++++++++++++++++++
 5 files changed, 169 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/11a20ffb/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunner.java
index 542439f..e26241a 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunner.java
@@ -28,8 +28,8 @@ import org.apache.beam.runners.core.MergingStateAccessor;
 import org.apache.beam.runners.core.StateAccessor;
 import org.apache.beam.runners.core.StateTag;
 import org.apache.beam.runners.core.StateTags;
+import org.apache.beam.sdk.coders.BitSetCoder;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.BitSetCoder;
 import org.apache.beam.sdk.util.Timers;
 import org.apache.beam.sdk.util.state.ValueState;
 import org.joda.time.Instant;

http://git-wip-us.apache.org/repos/asf/beam/blob/11a20ffb/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BitSetCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BitSetCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BitSetCoder.java
new file mode 100644
index 0000000..5a4db24
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BitSetCoder.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.coders;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.BitSet;
+
+/**
+ * Coder for {@link BitSet}.
+ */
+public class BitSetCoder extends CustomCoder<BitSet> {
+  private static final BitSetCoder INSTANCE = new BitSetCoder();
+  private static final ByteArrayCoder BYTE_ARRAY_CODER = ByteArrayCoder.of();
+
+  private BitSetCoder() {}
+
+  public static BitSetCoder of() {
+    return INSTANCE;
+  }
+
+  @Override
+  public void encode(BitSet value, OutputStream outStream, Context context)
+      throws CoderException, IOException {
+    if (value == null) {
+      throw new CoderException("cannot encode a null BitSet");
+    }
+    BYTE_ARRAY_CODER.encodeAndOwn(value.toByteArray(), outStream, context);
+  }
+
+  @Override
+  public BitSet decode(InputStream inStream, Context context)
+      throws CoderException, IOException {
+    return BitSet.valueOf(BYTE_ARRAY_CODER.decode(inStream, context));
+  }
+
+  @Override
+  public void verifyDeterministic() throws NonDeterministicException {
+    verifyDeterministic(
+        "BitSetCoder requires its ByteArrayCoder to be deterministic.", BYTE_ARRAY_CODER);
+  }
+
+  @Override
+  public boolean consistentWithEquals() {
+    return true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/11a20ffb/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java
index cca08d3..ab0a3e1 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java
@@ -33,6 +33,7 @@ import java.lang.reflect.TypeVariable;
 import java.lang.reflect.WildcardType;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.BitSet;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -91,6 +92,7 @@ public class CoderRegistry implements CoderProvider {
     // Register the standard coders first so they are chosen as the default
     Multimap<Class<?>, CoderFactory> codersToRegister = HashMultimap.create();
     codersToRegister.put(Byte.class, CoderFactories.fromStaticMethods(ByteCoder.class));
+    codersToRegister.put(BitSet.class, CoderFactories.fromStaticMethods(BitSetCoder.class));
     codersToRegister.put(Double.class, CoderFactories.fromStaticMethods(DoubleCoder.class));
     codersToRegister.put(Instant.class, CoderFactories.fromStaticMethods(InstantCoder.class));
     codersToRegister.put(Integer.class, CoderFactories.fromStaticMethods(VarIntCoder.class));

http://git-wip-us.apache.org/repos/asf/beam/blob/11a20ffb/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BitSetCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BitSetCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BitSetCoder.java
deleted file mode 100644
index eda4e5f..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BitSetCoder.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.BitSet;
-import org.apache.beam.sdk.coders.ByteArrayCoder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
-
-/**
- * Coder for the BitSet used to track child-trigger finished states.
- */
-public class BitSetCoder extends CustomCoder<BitSet> {
-
-  private static final BitSetCoder INSTANCE = new BitSetCoder();
-  private static final ByteArrayCoder BYTE_ARRAY_CODER = ByteArrayCoder.of();
-
-  private BitSetCoder() {}
-
-  public static BitSetCoder of() {
-    return INSTANCE;
-  }
-
-  @Override
-  public void encode(BitSet value, OutputStream outStream, Context context)
-      throws CoderException, IOException {
-    BYTE_ARRAY_CODER.encodeAndOwn(value.toByteArray(), outStream, context);
-  }
-
-  @Override
-  public BitSet decode(InputStream inStream, Context context)
-      throws CoderException, IOException {
-    return BitSet.valueOf(BYTE_ARRAY_CODER.decode(inStream, context));
-  }
-
-  @Override
-  public void verifyDeterministic() throws NonDeterministicException {
-    verifyDeterministic(
-        "BitSetCoder requires its ByteArrayCoder to be deterministic.", BYTE_ARRAY_CODER);
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/11a20ffb/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/BitSetCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/BitSetCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/BitSetCoderTest.java
new file mode 100644
index 0000000..b440f21
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/BitSetCoderTest.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.coders;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+import java.util.Arrays;
+import java.util.BitSet;
+import java.util.List;
+import org.apache.beam.sdk.testing.CoderProperties;
+import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link BitSetCoder}. */
+@RunWith(JUnit4.class)
+public class BitSetCoderTest {
+  private static final Coder<BitSet> TEST_CODER = BitSetCoder.of();
+
+  private static final List<BitSet> TEST_VALUES = Arrays.asList(
+      BitSet.valueOf(new byte[]{0xa, 0xb, 0xc}),
+      BitSet.valueOf(new byte[]{0xd, 0x3}),
+      BitSet.valueOf(new byte[]{0xd, 0xe}),
+      BitSet.valueOf(new byte[]{}));
+
+  @Test
+  public void testDecodeEncodeEquals() throws Exception {
+    for (BitSet value : TEST_VALUES) {
+      CoderProperties.coderDecodeEncodeEqual(TEST_CODER, value);
+    }
+  }
+
+  @Test
+  public void testRegisterByteSizeObserver() throws Exception {
+    CoderProperties.testByteCount(ByteArrayCoder.of(), Coder.Context.OUTER,
+        new byte[][]{{ 0xa, 0xb, 0xc }});
+
+    CoderProperties.testByteCount(ByteArrayCoder.of(), Coder.Context.NESTED,
+        new byte[][]{{ 0xa, 0xb, 0xc }, {}, {}, { 0xd, 0xe }, {}});
+  }
+
+  @Test
+  public void testStructuralValueConsistentWithEquals() throws Exception {
+    // We know that bi array coders are NOT compatible with equals
+    // (aka injective w.r.t. Object.equals)
+    for (BitSet value1 : TEST_VALUES) {
+      for (BitSet value2 : TEST_VALUES) {
+        CoderProperties.structuralValueConsistentWithEquals(TEST_CODER, value1, value2);
+      }
+    }
+  }
+
+  /**
+   * Generated data to check that the wire format has not changed. To regenerate, see
+   * {@link org.apache.beam.sdk.coders.PrintBase64Encodings}.
+   */
+  private static final List<String> TEST_ENCODINGS = Arrays.asList(
+      "CgsM",
+      "DQM",
+      "DQ4",
+      "");
+
+  @Test
+  public void testWireFormatEncode() throws Exception {
+    CoderProperties.coderEncodesBase64(TEST_CODER, TEST_VALUES, TEST_ENCODINGS);
+  }
+
+  @Rule
+  public ExpectedException thrown = ExpectedException.none();
+
+  @Test
+  public void encodeNullThrowsCoderException() throws Exception {
+    thrown.expect(CoderException.class);
+    thrown.expectMessage("cannot encode a null BitSet");
+
+    CoderUtils.encodeToBase64(TEST_CODER, null);
+  }
+
+  @Test
+  public void testEncodedTypeDescriptor() throws Exception {
+    assertThat(TEST_CODER.getEncodedTypeDescriptor(), equalTo(TypeDescriptor.of(BitSet.class)));
+  }
+}