You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/05/05 00:04:17 UTC

[11/19] beam git commit: Move ValueWithRecordId to sdk.values, annotated

Move ValueWithRecordId to sdk.values, annotated


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b2553caf
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b2553caf
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b2553caf

Branch: refs/heads/master
Commit: b2553caf1350eaea3caefe55d5af414694c96424
Parents: e0b3f80
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed May 3 20:19:51 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu May 4 16:06:55 2017 -0700

----------------------------------------------------------------------
 .../beam/runners/dataflow/DataflowRunner.java   |   2 +-
 .../beam/runners/spark/TestSparkRunner.java     |   2 +-
 .../sdk/io/BoundedReadFromUnboundedSource.java  |   2 +-
 .../apache/beam/sdk/util/ValueWithRecordId.java | 134 ------------------
 .../beam/sdk/values/ValueWithRecordId.java      | 138 +++++++++++++++++++
 .../beam/sdk/util/ValueWithRecordIdTest.java    |  34 -----
 .../beam/sdk/values/ValueWithRecordIdTest.java  |  34 +++++
 7 files changed, 175 insertions(+), 171 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/b2553caf/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 7123316..57da61b 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -121,7 +121,6 @@ import org.apache.beam.sdk.util.InstanceBuilder;
 import org.apache.beam.sdk.util.MimeTypes;
 import org.apache.beam.sdk.util.NameUtils;
 import org.apache.beam.sdk.util.ReleaseInfo;
-import org.apache.beam.sdk.util.ValueWithRecordId;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PBegin;
@@ -132,6 +131,7 @@ import org.apache.beam.sdk.values.PDone;
 import org.apache.beam.sdk.values.PInput;
 import org.apache.beam.sdk.values.PValue;
 import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.ValueWithRecordId;
 import org.apache.beam.sdk.values.WindowingStrategy;
 import org.joda.time.DateTimeUtils;
 import org.joda.time.DateTimeZone;

http://git-wip-us.apache.org/repos/asf/beam/blob/b2553caf/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
index 6808d7b..a6851c4 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
@@ -48,11 +48,11 @@ import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.util.ValueWithRecordId;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PValue;
 import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.ValueWithRecordId;
 import org.apache.commons.io.FileUtils;
 import org.joda.time.Duration;
 import org.joda.time.Instant;

http://git-wip-us.apache.org/repos/asf/beam/blob/b2553caf/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
index e54176f..d9adf92 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
@@ -37,9 +37,9 @@ import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.util.FluentBackoff;
 import org.apache.beam.sdk.util.NameUtils;
-import org.apache.beam.sdk.util.ValueWithRecordId;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.ValueWithRecordId;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
 

http://git-wip-us.apache.org/repos/asf/beam/blob/b2553caf/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ValueWithRecordId.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ValueWithRecordId.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ValueWithRecordId.java
deleted file mode 100644
index 9902aa7..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ValueWithRecordId.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import com.google.common.base.MoreObjects;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Objects;
-import org.apache.beam.sdk.coders.ByteArrayCoder;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CustomCoder;
-import org.apache.beam.sdk.transforms.DoFn;
-
-/**
- * Immutable struct containing a value as well as a unique id identifying the value.
- *
- * @param <ValueT> the underlying value type
- */
-public class ValueWithRecordId<ValueT> {
-  private final ValueT value;
-  private final byte[] id;
-
-  public ValueWithRecordId(ValueT value, byte[] id) {
-    this.value = value;
-    this.id = id;
-  }
-
-  public ValueT getValue() {
-    return value;
-  }
-
-  public byte[] getId() {
-    return id;
-  }
-
-  @Override
-  public String toString() {
-    return MoreObjects.toStringHelper(this)
-        .add("id", id)
-        .add("value", value)
-        .toString();
-  }
-
-  @Override
-  public boolean equals(Object other) {
-    if (this == other) {
-      return true;
-    }
-    if (!(other instanceof ValueWithRecordId)) {
-      return false;
-    }
-    ValueWithRecordId<?> otherRecord = (ValueWithRecordId<?>) other;
-    return Objects.deepEquals(id, otherRecord.id)
-        && Objects.deepEquals(value, otherRecord.value);
-  }
-
-  @Override
-  public int hashCode() {
-    return Objects.hash(Arrays.hashCode(id), value);
-  }
-
-  /**
-   * A {@link Coder} for {@code ValueWithRecordId}, using a wrapped value {@code Coder}.
-   */
-  public static class ValueWithRecordIdCoder<ValueT>
-      extends CustomCoder<ValueWithRecordId<ValueT>> {
-    public static <ValueT> ValueWithRecordIdCoder<ValueT> of(Coder<ValueT> valueCoder) {
-      return new ValueWithRecordIdCoder<>(valueCoder);
-    }
-
-    protected ValueWithRecordIdCoder(Coder<ValueT> valueCoder) {
-      this.valueCoder = valueCoder;
-      this.idCoder = ByteArrayCoder.of();
-    }
-
-    @Override
-    public List<? extends Coder<?>> getCoderArguments() {
-      return Arrays.asList(valueCoder);
-    }
-
-    @Override
-    public void encode(ValueWithRecordId<ValueT> value, OutputStream outStream, Context context)
-        throws IOException {
-      valueCoder.encode(value.value, outStream, context.nested());
-      idCoder.encode(value.id, outStream, context);
-    }
-
-    @Override
-    public ValueWithRecordId<ValueT> decode(InputStream inStream, Context context)
-        throws IOException {
-      return new ValueWithRecordId<ValueT>(
-          valueCoder.decode(inStream, context.nested()),
-          idCoder.decode(inStream, context));
-    }
-
-    @Override
-    public void verifyDeterministic() throws NonDeterministicException {
-      valueCoder.verifyDeterministic();
-    }
-
-    public Coder<ValueT> getValueCoder() {
-      return valueCoder;
-    }
-
-    Coder<ValueT> valueCoder;
-    ByteArrayCoder idCoder;
-  }
-
-  /** {@link DoFn} to turn a {@code ValueWithRecordId<T>} back to the value {@code T}. */
-  public static class StripIdsDoFn<T> extends DoFn<ValueWithRecordId<T>, T> {
-    @ProcessElement
-    public void processElement(ProcessContext c) {
-      c.output(c.element().getValue());
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/b2553caf/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueWithRecordId.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueWithRecordId.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueWithRecordId.java
new file mode 100644
index 0000000..0d92f40
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueWithRecordId.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.values;
+
+import com.google.common.base.MoreObjects;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CustomCoder;
+import org.apache.beam.sdk.transforms.DoFn;
+
+/**
+ * <b>For internal use only; no backwards compatibility guarantees.</b>
+ *
+ * <p>Immutable struct containing a value as well as a unique id identifying the value.
+ *
+ * @param <ValueT> the underlying value type
+ */
+@Internal
+public class ValueWithRecordId<ValueT> {
+  private final ValueT value;
+  private final byte[] id;
+
+  public ValueWithRecordId(ValueT value, byte[] id) {
+    this.value = value;
+    this.id = id;
+  }
+
+  public ValueT getValue() {
+    return value;
+  }
+
+  public byte[] getId() {
+    return id;
+  }
+
+  @Override
+  public String toString() {
+    return MoreObjects.toStringHelper(this)
+        .add("id", id)
+        .add("value", value)
+        .toString();
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (this == other) {
+      return true;
+    }
+    if (!(other instanceof ValueWithRecordId)) {
+      return false;
+    }
+    ValueWithRecordId<?> otherRecord = (ValueWithRecordId<?>) other;
+    return Objects.deepEquals(id, otherRecord.id)
+        && Objects.deepEquals(value, otherRecord.value);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(Arrays.hashCode(id), value);
+  }
+
+  /**
+   * A {@link Coder} for {@code ValueWithRecordId}, using a wrapped value {@code Coder}.
+   */
+  public static class ValueWithRecordIdCoder<ValueT>
+      extends CustomCoder<ValueWithRecordId<ValueT>> {
+    public static <ValueT> ValueWithRecordIdCoder<ValueT> of(Coder<ValueT> valueCoder) {
+      return new ValueWithRecordIdCoder<>(valueCoder);
+    }
+
+    protected ValueWithRecordIdCoder(Coder<ValueT> valueCoder) {
+      this.valueCoder = valueCoder;
+      this.idCoder = ByteArrayCoder.of();
+    }
+
+    @Override
+    public List<? extends Coder<?>> getCoderArguments() {
+      return Arrays.asList(valueCoder);
+    }
+
+    @Override
+    public void encode(ValueWithRecordId<ValueT> value, OutputStream outStream, Context context)
+        throws IOException {
+      valueCoder.encode(value.value, outStream, context.nested());
+      idCoder.encode(value.id, outStream, context);
+    }
+
+    @Override
+    public ValueWithRecordId<ValueT> decode(InputStream inStream, Context context)
+        throws IOException {
+      return new ValueWithRecordId<ValueT>(
+          valueCoder.decode(inStream, context.nested()),
+          idCoder.decode(inStream, context));
+    }
+
+    @Override
+    public void verifyDeterministic() throws NonDeterministicException {
+      valueCoder.verifyDeterministic();
+    }
+
+    public Coder<ValueT> getValueCoder() {
+      return valueCoder;
+    }
+
+    Coder<ValueT> valueCoder;
+    ByteArrayCoder idCoder;
+  }
+
+  /** {@link DoFn} to turn a {@code ValueWithRecordId<T>} back to the value {@code T}. */
+  public static class StripIdsDoFn<T> extends DoFn<ValueWithRecordId<T>, T> {
+    @ProcessElement
+    public void processElement(ProcessContext c) {
+      c.output(c.element().getValue());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/b2553caf/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ValueWithRecordIdTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ValueWithRecordIdTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ValueWithRecordIdTest.java
deleted file mode 100644
index e3a2dc6..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ValueWithRecordIdTest.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import org.apache.beam.sdk.testing.CoderProperties;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
-import org.apache.beam.sdk.util.ValueWithRecordId.ValueWithRecordIdCoder;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/** Tests for {@link ValueWithRecordId}. */
-@RunWith(JUnit4.class)
-public class ValueWithRecordIdTest {
-  @Test
-  public void testCoderIsSerializableWithWellKnownCoderType() {
-    CoderProperties.coderSerializable(ValueWithRecordIdCoder.of(GlobalWindow.Coder.INSTANCE));
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/b2553caf/sdks/java/core/src/test/java/org/apache/beam/sdk/values/ValueWithRecordIdTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/ValueWithRecordIdTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/ValueWithRecordIdTest.java
new file mode 100644
index 0000000..987c9af
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/ValueWithRecordIdTest.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.values;
+
+import org.apache.beam.sdk.testing.CoderProperties;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.values.ValueWithRecordId.ValueWithRecordIdCoder;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link ValueWithRecordId}. */
+@RunWith(JUnit4.class)
+public class ValueWithRecordIdTest {
+  @Test
+  public void testCoderIsSerializableWithWellKnownCoderType() {
+    CoderProperties.coderSerializable(ValueWithRecordIdCoder.of(GlobalWindow.Coder.INSTANCE));
+  }
+}