You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2017/04/27 00:23:56 UTC
[2/3] beam git commit: [BEAM-1871] Clean-up org.apache.beam.sdk.util,
move BitSetCoder from util to coder
[BEAM-1871] Clean-up org.apache.beam.sdk.util, move BitSetCoder from util to coder
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/11a20ffb
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/11a20ffb
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/11a20ffb
Branch: refs/heads/master
Commit: 11a20ffb8e1f4df4028d16551864bf36027d47a2
Parents: a32371e
Author: Luke Cwik <lc...@google.com>
Authored: Wed Apr 26 13:38:37 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Wed Apr 26 17:23:19 2017 -0700
----------------------------------------------------------------------
.../triggers/TriggerStateMachineRunner.java | 2 +-
.../org/apache/beam/sdk/coders/BitSetCoder.java | 63 ++++++++++++
.../apache/beam/sdk/coders/CoderRegistry.java | 2 +
.../org/apache/beam/sdk/util/BitSetCoder.java | 59 -----------
.../apache/beam/sdk/coders/BitSetCoderTest.java | 103 +++++++++++++++++++
5 files changed, 169 insertions(+), 60 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/11a20ffb/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunner.java
index 542439f..e26241a 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunner.java
@@ -28,8 +28,8 @@ import org.apache.beam.runners.core.MergingStateAccessor;
import org.apache.beam.runners.core.StateAccessor;
import org.apache.beam.runners.core.StateTag;
import org.apache.beam.runners.core.StateTags;
+import org.apache.beam.sdk.coders.BitSetCoder;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.BitSetCoder;
import org.apache.beam.sdk.util.Timers;
import org.apache.beam.sdk.util.state.ValueState;
import org.joda.time.Instant;
http://git-wip-us.apache.org/repos/asf/beam/blob/11a20ffb/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BitSetCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BitSetCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BitSetCoder.java
new file mode 100644
index 0000000..5a4db24
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BitSetCoder.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.coders;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.BitSet;
+
+/**
+ * Coder for {@link BitSet}.
+ */
+public class BitSetCoder extends CustomCoder<BitSet> {
+ private static final BitSetCoder INSTANCE = new BitSetCoder();
+ private static final ByteArrayCoder BYTE_ARRAY_CODER = ByteArrayCoder.of();
+
+ private BitSetCoder() {}
+
+ public static BitSetCoder of() {
+ return INSTANCE;
+ }
+
+ @Override
+ public void encode(BitSet value, OutputStream outStream, Context context)
+ throws CoderException, IOException {
+ if (value == null) {
+ throw new CoderException("cannot encode a null BitSet");
+ }
+ BYTE_ARRAY_CODER.encodeAndOwn(value.toByteArray(), outStream, context);
+ }
+
+ @Override
+ public BitSet decode(InputStream inStream, Context context)
+ throws CoderException, IOException {
+ return BitSet.valueOf(BYTE_ARRAY_CODER.decode(inStream, context));
+ }
+
+ @Override
+ public void verifyDeterministic() throws NonDeterministicException {
+ verifyDeterministic(
+ "BitSetCoder requires its ByteArrayCoder to be deterministic.", BYTE_ARRAY_CODER);
+ }
+
+ @Override
+ public boolean consistentWithEquals() {
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/11a20ffb/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java
index cca08d3..ab0a3e1 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java
@@ -33,6 +33,7 @@ import java.lang.reflect.TypeVariable;
import java.lang.reflect.WildcardType;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.BitSet;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -91,6 +92,7 @@ public class CoderRegistry implements CoderProvider {
// Register the standard coders first so they are chosen as the default
Multimap<Class<?>, CoderFactory> codersToRegister = HashMultimap.create();
codersToRegister.put(Byte.class, CoderFactories.fromStaticMethods(ByteCoder.class));
+ codersToRegister.put(BitSet.class, CoderFactories.fromStaticMethods(BitSetCoder.class));
codersToRegister.put(Double.class, CoderFactories.fromStaticMethods(DoubleCoder.class));
codersToRegister.put(Instant.class, CoderFactories.fromStaticMethods(InstantCoder.class));
codersToRegister.put(Integer.class, CoderFactories.fromStaticMethods(VarIntCoder.class));
http://git-wip-us.apache.org/repos/asf/beam/blob/11a20ffb/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BitSetCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BitSetCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BitSetCoder.java
deleted file mode 100644
index eda4e5f..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BitSetCoder.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.BitSet;
-import org.apache.beam.sdk.coders.ByteArrayCoder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
-
-/**
- * Coder for the BitSet used to track child-trigger finished states.
- */
-public class BitSetCoder extends CustomCoder<BitSet> {
-
- private static final BitSetCoder INSTANCE = new BitSetCoder();
- private static final ByteArrayCoder BYTE_ARRAY_CODER = ByteArrayCoder.of();
-
- private BitSetCoder() {}
-
- public static BitSetCoder of() {
- return INSTANCE;
- }
-
- @Override
- public void encode(BitSet value, OutputStream outStream, Context context)
- throws CoderException, IOException {
- BYTE_ARRAY_CODER.encodeAndOwn(value.toByteArray(), outStream, context);
- }
-
- @Override
- public BitSet decode(InputStream inStream, Context context)
- throws CoderException, IOException {
- return BitSet.valueOf(BYTE_ARRAY_CODER.decode(inStream, context));
- }
-
- @Override
- public void verifyDeterministic() throws NonDeterministicException {
- verifyDeterministic(
- "BitSetCoder requires its ByteArrayCoder to be deterministic.", BYTE_ARRAY_CODER);
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/11a20ffb/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/BitSetCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/BitSetCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/BitSetCoderTest.java
new file mode 100644
index 0000000..b440f21
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/BitSetCoderTest.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.coders;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+import java.util.Arrays;
+import java.util.BitSet;
+import java.util.List;
+import org.apache.beam.sdk.testing.CoderProperties;
+import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link BitSetCoder}. */
+@RunWith(JUnit4.class)
+public class BitSetCoderTest {
+ private static final Coder<BitSet> TEST_CODER = BitSetCoder.of();
+
+ private static final List<BitSet> TEST_VALUES = Arrays.asList(
+ BitSet.valueOf(new byte[]{0xa, 0xb, 0xc}),
+ BitSet.valueOf(new byte[]{0xd, 0x3}),
+ BitSet.valueOf(new byte[]{0xd, 0xe}),
+ BitSet.valueOf(new byte[]{}));
+
+ @Test
+ public void testDecodeEncodeEquals() throws Exception {
+ for (BitSet value : TEST_VALUES) {
+ CoderProperties.coderDecodeEncodeEqual(TEST_CODER, value);
+ }
+ }
+
+ @Test
+ public void testRegisterByteSizeObserver() throws Exception {
+ CoderProperties.testByteCount(ByteArrayCoder.of(), Coder.Context.OUTER,
+ new byte[][]{{ 0xa, 0xb, 0xc }});
+
+ CoderProperties.testByteCount(ByteArrayCoder.of(), Coder.Context.NESTED,
+ new byte[][]{{ 0xa, 0xb, 0xc }, {}, {}, { 0xd, 0xe }, {}});
+ }
+
+ @Test
+ public void testStructuralValueConsistentWithEquals() throws Exception {
+ // We know that bi array coders are NOT compatible with equals
+ // (aka injective w.r.t. Object.equals)
+ for (BitSet value1 : TEST_VALUES) {
+ for (BitSet value2 : TEST_VALUES) {
+ CoderProperties.structuralValueConsistentWithEquals(TEST_CODER, value1, value2);
+ }
+ }
+ }
+
+ /**
+ * Generated data to check that the wire format has not changed. To regenerate, see
+ * {@link org.apache.beam.sdk.coders.PrintBase64Encodings}.
+ */
+ private static final List<String> TEST_ENCODINGS = Arrays.asList(
+ "CgsM",
+ "DQM",
+ "DQ4",
+ "");
+
+ @Test
+ public void testWireFormatEncode() throws Exception {
+ CoderProperties.coderEncodesBase64(TEST_CODER, TEST_VALUES, TEST_ENCODINGS);
+ }
+
+ @Rule
+ public ExpectedException thrown = ExpectedException.none();
+
+ @Test
+ public void encodeNullThrowsCoderException() throws Exception {
+ thrown.expect(CoderException.class);
+ thrown.expectMessage("cannot encode a null BitSet");
+
+ CoderUtils.encodeToBase64(TEST_CODER, null);
+ }
+
+ @Test
+ public void testEncodedTypeDescriptor() throws Exception {
+ assertThat(TEST_CODER.getEncodedTypeDescriptor(), equalTo(TypeDescriptor.of(BitSet.class)));
+ }
+}