You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/05/05 00:04:17 UTC
[11/19] beam git commit: Move ValueWithRecordId to sdk.values,
annotated
Move ValueWithRecordId to sdk.values, annotated
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b2553caf
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b2553caf
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b2553caf
Branch: refs/heads/master
Commit: b2553caf1350eaea3caefe55d5af414694c96424
Parents: e0b3f80
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed May 3 20:19:51 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu May 4 16:06:55 2017 -0700
----------------------------------------------------------------------
.../beam/runners/dataflow/DataflowRunner.java | 2 +-
.../beam/runners/spark/TestSparkRunner.java | 2 +-
.../sdk/io/BoundedReadFromUnboundedSource.java | 2 +-
.../apache/beam/sdk/util/ValueWithRecordId.java | 134 ------------------
.../beam/sdk/values/ValueWithRecordId.java | 138 +++++++++++++++++++
.../beam/sdk/util/ValueWithRecordIdTest.java | 34 -----
.../beam/sdk/values/ValueWithRecordIdTest.java | 34 +++++
7 files changed, 175 insertions(+), 171 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/b2553caf/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 7123316..57da61b 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -121,7 +121,6 @@ import org.apache.beam.sdk.util.InstanceBuilder;
import org.apache.beam.sdk.util.MimeTypes;
import org.apache.beam.sdk.util.NameUtils;
import org.apache.beam.sdk.util.ReleaseInfo;
-import org.apache.beam.sdk.util.ValueWithRecordId;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
@@ -132,6 +131,7 @@ import org.apache.beam.sdk.values.PDone;
import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.ValueWithRecordId;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.joda.time.DateTimeUtils;
import org.joda.time.DateTimeZone;
http://git-wip-us.apache.org/repos/asf/beam/blob/b2553caf/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
index 6808d7b..a6851c4 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
@@ -48,11 +48,11 @@ import org.apache.beam.sdk.runners.PipelineRunner;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.util.ValueWithRecordId;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.ValueWithRecordId;
import org.apache.commons.io.FileUtils;
import org.joda.time.Duration;
import org.joda.time.Instant;
http://git-wip-us.apache.org/repos/asf/beam/blob/b2553caf/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
index e54176f..d9adf92 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
@@ -37,9 +37,9 @@ import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.util.FluentBackoff;
import org.apache.beam.sdk.util.NameUtils;
-import org.apache.beam.sdk.util.ValueWithRecordId;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.ValueWithRecordId;
import org.joda.time.Duration;
import org.joda.time.Instant;
http://git-wip-us.apache.org/repos/asf/beam/blob/b2553caf/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ValueWithRecordId.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ValueWithRecordId.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ValueWithRecordId.java
deleted file mode 100644
index 9902aa7..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ValueWithRecordId.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import com.google.common.base.MoreObjects;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Objects;
-import org.apache.beam.sdk.coders.ByteArrayCoder;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CustomCoder;
-import org.apache.beam.sdk.transforms.DoFn;
-
-/**
- * Immutable struct containing a value as well as a unique id identifying the value.
- *
- * @param <ValueT> the underlying value type
- */
-public class ValueWithRecordId<ValueT> {
- private final ValueT value;
- private final byte[] id;
-
- public ValueWithRecordId(ValueT value, byte[] id) {
- this.value = value;
- this.id = id;
- }
-
- public ValueT getValue() {
- return value;
- }
-
- public byte[] getId() {
- return id;
- }
-
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(this)
- .add("id", id)
- .add("value", value)
- .toString();
- }
-
- @Override
- public boolean equals(Object other) {
- if (this == other) {
- return true;
- }
- if (!(other instanceof ValueWithRecordId)) {
- return false;
- }
- ValueWithRecordId<?> otherRecord = (ValueWithRecordId<?>) other;
- return Objects.deepEquals(id, otherRecord.id)
- && Objects.deepEquals(value, otherRecord.value);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(Arrays.hashCode(id), value);
- }
-
- /**
- * A {@link Coder} for {@code ValueWithRecordId}, using a wrapped value {@code Coder}.
- */
- public static class ValueWithRecordIdCoder<ValueT>
- extends CustomCoder<ValueWithRecordId<ValueT>> {
- public static <ValueT> ValueWithRecordIdCoder<ValueT> of(Coder<ValueT> valueCoder) {
- return new ValueWithRecordIdCoder<>(valueCoder);
- }
-
- protected ValueWithRecordIdCoder(Coder<ValueT> valueCoder) {
- this.valueCoder = valueCoder;
- this.idCoder = ByteArrayCoder.of();
- }
-
- @Override
- public List<? extends Coder<?>> getCoderArguments() {
- return Arrays.asList(valueCoder);
- }
-
- @Override
- public void encode(ValueWithRecordId<ValueT> value, OutputStream outStream, Context context)
- throws IOException {
- valueCoder.encode(value.value, outStream, context.nested());
- idCoder.encode(value.id, outStream, context);
- }
-
- @Override
- public ValueWithRecordId<ValueT> decode(InputStream inStream, Context context)
- throws IOException {
- return new ValueWithRecordId<ValueT>(
- valueCoder.decode(inStream, context.nested()),
- idCoder.decode(inStream, context));
- }
-
- @Override
- public void verifyDeterministic() throws NonDeterministicException {
- valueCoder.verifyDeterministic();
- }
-
- public Coder<ValueT> getValueCoder() {
- return valueCoder;
- }
-
- Coder<ValueT> valueCoder;
- ByteArrayCoder idCoder;
- }
-
- /** {@link DoFn} to turn a {@code ValueWithRecordId<T>} back to the value {@code T}. */
- public static class StripIdsDoFn<T> extends DoFn<ValueWithRecordId<T>, T> {
- @ProcessElement
- public void processElement(ProcessContext c) {
- c.output(c.element().getValue());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/b2553caf/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueWithRecordId.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueWithRecordId.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueWithRecordId.java
new file mode 100644
index 0000000..0d92f40
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueWithRecordId.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.values;
+
+import com.google.common.base.MoreObjects;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CustomCoder;
+import org.apache.beam.sdk.transforms.DoFn;
+
+/**
+ * <b>For internal use only; no backwards compatibility guarantees.</b>
+ *
+ * <p>Immutable struct containing a value as well as a unique id identifying the value.
+ *
+ * @param <ValueT> the underlying value type
+ */
+@Internal
+public class ValueWithRecordId<ValueT> {
+ private final ValueT value;
+ private final byte[] id;
+
+ public ValueWithRecordId(ValueT value, byte[] id) {
+ this.value = value;
+ this.id = id;
+ }
+
+ public ValueT getValue() {
+ return value;
+ }
+
+ public byte[] getId() {
+ return id;
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("id", id)
+ .add("value", value)
+ .toString();
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (this == other) {
+ return true;
+ }
+ if (!(other instanceof ValueWithRecordId)) {
+ return false;
+ }
+ ValueWithRecordId<?> otherRecord = (ValueWithRecordId<?>) other;
+ return Objects.deepEquals(id, otherRecord.id)
+ && Objects.deepEquals(value, otherRecord.value);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(Arrays.hashCode(id), value);
+ }
+
+ /**
+ * A {@link Coder} for {@code ValueWithRecordId}, using a wrapped value {@code Coder}.
+ */
+ public static class ValueWithRecordIdCoder<ValueT>
+ extends CustomCoder<ValueWithRecordId<ValueT>> {
+ public static <ValueT> ValueWithRecordIdCoder<ValueT> of(Coder<ValueT> valueCoder) {
+ return new ValueWithRecordIdCoder<>(valueCoder);
+ }
+
+ protected ValueWithRecordIdCoder(Coder<ValueT> valueCoder) {
+ this.valueCoder = valueCoder;
+ this.idCoder = ByteArrayCoder.of();
+ }
+
+ @Override
+ public List<? extends Coder<?>> getCoderArguments() {
+ return Arrays.asList(valueCoder);
+ }
+
+ @Override
+ public void encode(ValueWithRecordId<ValueT> value, OutputStream outStream, Context context)
+ throws IOException {
+ valueCoder.encode(value.value, outStream, context.nested());
+ idCoder.encode(value.id, outStream, context);
+ }
+
+ @Override
+ public ValueWithRecordId<ValueT> decode(InputStream inStream, Context context)
+ throws IOException {
+ return new ValueWithRecordId<ValueT>(
+ valueCoder.decode(inStream, context.nested()),
+ idCoder.decode(inStream, context));
+ }
+
+ @Override
+ public void verifyDeterministic() throws NonDeterministicException {
+ valueCoder.verifyDeterministic();
+ }
+
+ public Coder<ValueT> getValueCoder() {
+ return valueCoder;
+ }
+
+ Coder<ValueT> valueCoder;
+ ByteArrayCoder idCoder;
+ }
+
+ /** {@link DoFn} to turn a {@code ValueWithRecordId<T>} back to the value {@code T}. */
+ public static class StripIdsDoFn<T> extends DoFn<ValueWithRecordId<T>, T> {
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ c.output(c.element().getValue());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/b2553caf/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ValueWithRecordIdTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ValueWithRecordIdTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ValueWithRecordIdTest.java
deleted file mode 100644
index e3a2dc6..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ValueWithRecordIdTest.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import org.apache.beam.sdk.testing.CoderProperties;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
-import org.apache.beam.sdk.util.ValueWithRecordId.ValueWithRecordIdCoder;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/** Tests for {@link ValueWithRecordId}. */
-@RunWith(JUnit4.class)
-public class ValueWithRecordIdTest {
- @Test
- public void testCoderIsSerializableWithWellKnownCoderType() {
- CoderProperties.coderSerializable(ValueWithRecordIdCoder.of(GlobalWindow.Coder.INSTANCE));
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/b2553caf/sdks/java/core/src/test/java/org/apache/beam/sdk/values/ValueWithRecordIdTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/ValueWithRecordIdTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/ValueWithRecordIdTest.java
new file mode 100644
index 0000000..987c9af
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/ValueWithRecordIdTest.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.values;
+
+import org.apache.beam.sdk.testing.CoderProperties;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.values.ValueWithRecordId.ValueWithRecordIdCoder;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link ValueWithRecordId}. */
+@RunWith(JUnit4.class)
+public class ValueWithRecordIdTest {
+ @Test
+ public void testCoderIsSerializableWithWellKnownCoderType() {
+ CoderProperties.coderSerializable(ValueWithRecordIdCoder.of(GlobalWindow.Coder.INSTANCE));
+ }
+}