You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2016/04/28 02:28:10 UTC
[1/7] incubator-beam git commit: [BEAM-151] Update worker image name
that corresponds to class moves.
Repository: incubator-beam
Updated Branches:
refs/heads/master 81bf4d981 -> e2d5c691e
[BEAM-151] Update worker image name that corresponds to class moves.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4b73d187
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4b73d187
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4b73d187
Branch: refs/heads/master
Commit: 4b73d187648a2026d8d9647f4987671d326b0d81
Parents: 6023d26
Author: Luke Cwik <lc...@google.com>
Authored: Wed Apr 27 16:55:56 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Wed Apr 27 17:26:30 2016 -0700
----------------------------------------------------------------------
runners/google-cloud-dataflow-java/pom.xml | 1 +
.../org/apache/beam/runners/dataflow/DataflowPipelineRunner.java | 4 ++--
sdks/java/core/pom.xml | 1 +
3 files changed, 4 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4b73d187/runners/google-cloud-dataflow-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/pom.xml b/runners/google-cloud-dataflow-java/pom.xml
index 23b7f5f..3dd9cb8 100644
--- a/runners/google-cloud-dataflow-java/pom.xml
+++ b/runners/google-cloud-dataflow-java/pom.xml
@@ -524,6 +524,7 @@
<version>1.0-rc2</version>
<optional>true</optional>
</dependency>
+
<dependency>
<groupId>com.google.auto.value</groupId>
<artifactId>auto-value</artifactId>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4b73d187/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineRunner.java
index 2f01101..41b4df7 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineRunner.java
@@ -217,9 +217,9 @@ public class DataflowPipelineRunner extends PipelineRunner<DataflowPipelineJob>
// Default Docker container images that execute Dataflow worker harness, residing in Google
// Container Registry, separately for Batch and Streaming.
public static final String BATCH_WORKER_HARNESS_CONTAINER_IMAGE
- = "dataflow.gcr.io/v1beta3/beam-java-batch:beam-master-20160426";
+ = "dataflow.gcr.io/v1beta3/beam-java-batch:beam-master-20160427";
public static final String STREAMING_WORKER_HARNESS_CONTAINER_IMAGE
- = "dataflow.gcr.io/v1beta3/beam-java-streaming:beam-master-20160426";
+ = "dataflow.gcr.io/v1beta3/beam-java-streaming:beam-master-20160427";
// The limit of CreateJob request size.
private static final int CREATE_JOB_REQUEST_LIMIT_BYTES = 10 * 1024 * 1024;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4b73d187/sdks/java/core/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/core/pom.xml b/sdks/java/core/pom.xml
index 07d2fce..0d530e1 100644
--- a/sdks/java/core/pom.xml
+++ b/sdks/java/core/pom.xml
@@ -697,6 +697,7 @@
<version>1.0-rc2</version>
<optional>true</optional>
</dependency>
+
<dependency>
<groupId>com.google.auto.value</groupId>
<artifactId>auto-value</artifactId>
[7/7] incubator-beam git commit: [BEAM-151] This closes #243
Posted by lc...@apache.org.
[BEAM-151] This closes #243
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e2d5c691
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e2d5c691
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e2d5c691
Branch: refs/heads/master
Commit: e2d5c691e06843f9e877d85b719dd00b47668025
Parents: 81bf4d9 4b73d18
Author: Luke Cwik <lc...@google.com>
Authored: Wed Apr 27 17:26:54 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Wed Apr 27 17:26:54 2016 -0700
----------------------------------------------------------------------
runners/google-cloud-dataflow-java/pom.xml | 7 +
.../runners/dataflow/DataflowPipelineJob.java | 2 +-
.../dataflow/DataflowPipelineRunner.java | 12 +-
.../dataflow/DataflowPipelineTranslator.java | 4 +-
.../runners/dataflow/internal/IsmFormat.java | 811 +++++++++++++++++++
.../beam/runners/dataflow/util/DoFnInfo.java | 69 ++
.../runners/dataflow/util/MonitoringUtil.java | 3 +-
.../runners/dataflow/util/OutputReference.java | 43 +
.../runners/dataflow/util/RandomAccessData.java | 356 ++++++++
.../beam/runners/dataflow/util/TimeUtil.java | 166 ++++
.../dataflow/DataflowPipelineRunnerTest.java | 8 +-
.../DataflowPipelineTranslatorTest.java | 2 +-
.../testing/TestDataflowPipelineRunnerTest.java | 2 +-
.../dataflow/util/MonitoringUtilTest.java | 1 -
.../dataflow/util/RandomAccessDataTest.java | 207 +++++
.../runners/dataflow/util/TimeUtilTest.java | 75 ++
sdks/java/core/pom.xml | 1 +
.../beam/sdk/runners/worker/IsmFormat.java | 811 -------------------
.../beam/sdk/runners/worker/package-info.java | 25 -
.../java/org/apache/beam/sdk/util/DoFnInfo.java | 68 --
.../apache/beam/sdk/util/OutputReference.java | 43 -
.../apache/beam/sdk/util/RandomAccessData.java | 355 --------
.../java/org/apache/beam/sdk/util/TimeUtil.java | 166 ----
.../sdk/util/common/worker/package-info.java | 19 -
.../beam/sdk/util/RandomAccessDataTest.java | 207 -----
.../org/apache/beam/sdk/util/TimeUtilTest.java | 75 --
26 files changed, 1751 insertions(+), 1787 deletions(-)
----------------------------------------------------------------------
[5/7] incubator-beam git commit: [BEAM-151] Move over some more
Dataflow specific classes.
Posted by lc...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/27979d76/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/worker/IsmFormat.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/worker/IsmFormat.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/worker/IsmFormat.java
deleted file mode 100644
index 8df46dd..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/worker/IsmFormat.java
+++ /dev/null
@@ -1,811 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.runners.worker;
-
-import static org.apache.beam.sdk.util.Structs.addLong;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
-
-import org.apache.beam.sdk.coders.AtomicCoder;
-import org.apache.beam.sdk.coders.ByteArrayCoder;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.Coder.NonDeterministicException;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.ListCoder;
-import org.apache.beam.sdk.coders.StandardCoder;
-import org.apache.beam.sdk.coders.VarIntCoder;
-import org.apache.beam.sdk.coders.VarLongCoder;
-import org.apache.beam.sdk.util.CloudObject;
-import org.apache.beam.sdk.util.PropertyNames;
-import org.apache.beam.sdk.util.RandomAccessData;
-import org.apache.beam.sdk.util.VarInt;
-import org.apache.beam.sdk.values.PCollection;
-
-import com.google.auto.value.AutoValue;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
-import com.google.common.hash.HashFunction;
-import com.google.common.hash.Hashing;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.ArrayList;
-import java.util.List;
-import javax.annotation.Nullable;
-
-/**
- * An Ism file is a prefix encoded composite key value file broken into shards. Each composite
- * key is composed of a fixed number of component keys. A fixed number of those sub keys represent
- * the shard key portion; see {@link IsmRecord} and {@link IsmRecordCoder} for further details
- * around the data format. In addition to the data, there is a bloom filter,
- * and multiple indices to allow for efficient retrieval.
- *
- * <p>An Ism file is composed of these high level sections (in order):
- * <ul>
- * <li>shard block</li>
- * <li>bloom filter (See {@code ScalableBloomFilter} for details on encoding format)</li>
- * <li>shard index</li>
- * <li>footer (See {@link Footer} for details on encoding format)</li>
- * </ul>
- *
- * <p>The shard block is composed of multiple copies of the following:
- * <ul>
- * <li>data block</li>
- * <li>data index</li>
- * </ul>
- *
- * <p>The data block is composed of multiple copies of the following:
- * <ul>
- * <li>key prefix (See {@link KeyPrefix} for details on encoding format)</li>
- * <li>unshared key bytes</li>
- * <li>value bytes</li>
- * <li>optional 0x00 0x00 bytes followed by metadata bytes
- * (if the following 0x00 0x00 bytes are not present, then there are no metadata bytes)</li>
- * </ul>
- * Each key written into the data block must be in unsigned lexicographically increasing order
- * and also its shard portion of the key must hash to the same shard id as all other keys
- * within the same data block. The hashing function used is the
- * <a href="http://smhasher.googlecode.com/svn/trunk/MurmurHash3.cpp">
- * 32-bit murmur3 algorithm, x86 variant</a> (little-endian variant),
- * using {@code 1225801234} as the seed value.
- *
- * <p>The data index is composed of {@code N} copies of the following:
- * <ul>
- * <li>key prefix (See {@link KeyPrefix} for details on encoding format)</li>
- * <li>unshared key bytes</li>
- * <li>byte offset to key prefix in data block (variable length long coding)</li>
- * </ul>
- *
- * <p>The shard index is composed of a {@link VarInt variable length integer} encoding representing
- * the number of shard index records followed by that many shard index records.
- * See {@link IsmShardCoder} for further details as to its encoding scheme.
- */
-public class IsmFormat {
- private static final int HASH_SEED = 1225801234;
- private static final HashFunction HASH_FUNCTION = Hashing.murmur3_32(HASH_SEED);
- public static final int SHARD_BITS = 0x7F; // [0-127] shards + [128-255] metadata shards
-
- /**
- * A record containing a composite key and either a value or metadata. The composite key
- * must not contain the metadata key component place holder if producing a value record, and must
- * contain the metadata component key place holder if producing a metadata record.
- *
- * <p>The composite key is a fixed number of component keys where the first {@code N} component
- * keys are used to create a shard id via hashing. See {@link IsmRecordCoder#hash(List)} for
- * further details.
- */
- @AutoValue
- public abstract static class IsmRecord<V> {
- abstract List<?> keyComponents();
- @Nullable abstract V value();
- @Nullable abstract byte[] metadata();
-
- IsmRecord() {} // Prevent public constructor
-
- /** Returns an IsmRecord with the specified key components and value. */
- public static <V> IsmRecord<V> of(List<?> keyComponents, V value) {
- checkArgument(!keyComponents.isEmpty(), "Expected non-empty list of key components.");
- checkArgument(!isMetadataKey(keyComponents),
- "Expected key components to not contain metadata key.");
- return new AutoValue_IsmFormat_IsmRecord<V>(keyComponents, value, null);
- }
-
- public static <V> IsmRecord<V> meta(List<?> keyComponents, byte[] metadata) {
- checkNotNull(metadata);
- checkArgument(!keyComponents.isEmpty(), "Expected non-empty list of key components.");
- checkArgument(isMetadataKey(keyComponents),
- "Expected key components to contain metadata key.");
- return new AutoValue_IsmFormat_IsmRecord<V>(keyComponents, null, metadata);
- }
-
- /** Returns the list of key components. */
- public List<?> getKeyComponents() {
- return keyComponents();
- }
-
- /** Returns the key component at the specified index. */
- public Object getKeyComponent(int index) {
- return keyComponents().get(index);
- }
-
- /**
- * Returns the value. Throws {@link IllegalStateException} if this is not a
- * value record.
- */
- public V getValue() {
- checkState(!isMetadataKey(keyComponents()),
- "This is a metadata record and not a value record.");
- return value();
- }
-
- /**
- * Returns the metadata. Throws {@link IllegalStateException} if this is not a
- * metadata record.
- */
- public byte[] getMetadata() {
- checkState(isMetadataKey(keyComponents()),
- "This is a value record and not a metadata record.");
- return metadata();
- }
- }
-
- /** A {@link Coder} for {@link IsmRecord}s.
- *
- * <p>Note that this coder standalone will not produce an Ism file. This coder can be used
- * to materialize a {@link PCollection} of {@link IsmRecord}s. Only when this coder
- * is combined with an {@link IsmSink} will one produce an Ism file.
- *
- * <p>The {@link IsmRecord} encoded format is:
- * <ul>
- * <li>encoded key component 1 using key component coder 1</li>
- * <li>...</li>
- * <li>encoded key component N using key component coder N</li>
- * <li>encoded value using value coder</li>
- * </ul>
- */
- public static class IsmRecordCoder<V>
- extends StandardCoder<IsmRecord<V>> {
- /** Returns an IsmRecordCoder with the specified key component coders, value coder. */
- public static <V> IsmRecordCoder<V> of(
- int numberOfShardKeyCoders,
- int numberOfMetadataShardKeyCoders,
- List<Coder<?>> keyComponentCoders,
- Coder<V> valueCoder) {
- checkNotNull(keyComponentCoders);
- checkArgument(keyComponentCoders.size() > 0);
- checkArgument(numberOfShardKeyCoders > 0);
- checkArgument(numberOfShardKeyCoders <= keyComponentCoders.size());
- checkArgument(numberOfMetadataShardKeyCoders <= keyComponentCoders.size());
- return new IsmRecordCoder<>(
- numberOfShardKeyCoders,
- numberOfMetadataShardKeyCoders,
- keyComponentCoders,
- valueCoder);
- }
-
- /**
- * Returns an IsmRecordCoder with the specified coders. Note that this method is not meant
- * to be called by users but used by Jackson when decoding this coder.
- */
- @JsonCreator
- public static IsmRecordCoder<?> of(
- @JsonProperty(PropertyNames.NUM_SHARD_CODERS) int numberOfShardCoders,
- @JsonProperty(PropertyNames.NUM_METADATA_SHARD_CODERS) int numberOfMetadataShardCoders,
- @JsonProperty(PropertyNames.COMPONENT_ENCODINGS) List<Coder<?>> components) {
- Preconditions.checkArgument(components.size() >= 2,
- "Expecting at least 2 components, got " + components.size());
- return of(
- numberOfShardCoders,
- numberOfMetadataShardCoders,
- components.subList(0, components.size() - 1),
- components.get(components.size() - 1));
- }
-
- private final int numberOfShardKeyCoders;
- private final int numberOfMetadataShardKeyCoders;
- private final List<Coder<?>> keyComponentCoders;
- private final Coder<V> valueCoder;
-
- private IsmRecordCoder(
- int numberOfShardKeyCoders,
- int numberOfMetadataShardKeyCoders,
- List<Coder<?>> keyComponentCoders, Coder<V> valueCoder) {
- this.numberOfShardKeyCoders = numberOfShardKeyCoders;
- this.numberOfMetadataShardKeyCoders = numberOfMetadataShardKeyCoders;
- this.keyComponentCoders = keyComponentCoders;
- this.valueCoder = valueCoder;
- }
-
- /** Returns the list of key component coders. */
- public List<Coder<?>> getKeyComponentCoders() {
- return keyComponentCoders;
- }
-
- /** Returns the key coder at the specified index. */
- public Coder getKeyComponentCoder(int index) {
- return keyComponentCoders.get(index);
- }
-
- /** Returns the value coder. */
- public Coder<V> getValueCoder() {
- return valueCoder;
- }
-
- @Override
- public void encode(IsmRecord<V> value, OutputStream outStream,
- Coder.Context context) throws CoderException, IOException {
- if (value.getKeyComponents().size() != keyComponentCoders.size()) {
- throw new CoderException(String.format(
- "Expected %s key component(s) but received key component(s) %s.",
- keyComponentCoders.size(), value.getKeyComponents()));
- }
- for (int i = 0; i < keyComponentCoders.size(); ++i) {
- getKeyComponentCoder(i).encode(value.getKeyComponent(i), outStream, context.nested());
- }
- if (isMetadataKey(value.getKeyComponents())) {
- ByteArrayCoder.of().encode(value.getMetadata(), outStream, context.nested());
- } else {
- valueCoder.encode(value.getValue(), outStream, context.nested());
- }
- }
-
- @Override
- public IsmRecord<V> decode(InputStream inStream, Coder.Context context)
- throws CoderException, IOException {
- List<Object> keyComponents = new ArrayList<>(keyComponentCoders.size());
- for (Coder<?> keyCoder : keyComponentCoders) {
- keyComponents.add(keyCoder.decode(inStream, context.nested()));
- }
- if (isMetadataKey(keyComponents)) {
- return IsmRecord.<V>meta(
- keyComponents, ByteArrayCoder.of().decode(inStream, context.nested()));
- } else {
- return IsmRecord.<V>of(keyComponents, valueCoder.decode(inStream, context.nested()));
- }
- }
-
- public int getNumberOfShardKeyCoders(List<?> keyComponents) {
- if (isMetadataKey(keyComponents)) {
- return numberOfMetadataShardKeyCoders;
- } else {
- return numberOfShardKeyCoders;
- }
- }
-
- /**
- * Computes the shard id for the given key component(s).
- *
- * The shard keys are encoded into their byte representations and hashed using the
- * <a href="http://smhasher.googlecode.com/svn/trunk/MurmurHash3.cpp">
- * 32-bit murmur3 algorithm, x86 variant</a> (little-endian variant),
- * using {@code 1225801234} as the seed value. We ensure that shard ids for
- * metadata keys and normal keys do not overlap.
- */
- public <V, T> int hash(List<?> keyComponents) {
- return encodeAndHash(keyComponents, new RandomAccessData(), new ArrayList<Integer>());
- }
-
- /**
- * Computes the shard id for the given key component(s).
- *
- * Mutates {@code keyBytes} such that when returned, contains the encoded
- * version of the key components.
- */
- public <V, T> int encodeAndHash(List<?> keyComponents, RandomAccessData keyBytesToMutate) {
- return encodeAndHash(keyComponents, keyBytesToMutate, new ArrayList<Integer>());
- }
-
- /**
- * Computes the shard id for the given key component(s).
- *
- * Mutates {@code keyBytes} such that when returned, contains the encoded
- * version of the key components. Also, mutates {@code keyComponentByteOffsetsToMutate} to
- * store the location where each key component's encoded byte representation ends within
- * {@code keyBytes}.
- */
- public <V, T> int encodeAndHash(
- List<?> keyComponents,
- RandomAccessData keyBytesToMutate,
- List<Integer> keyComponentByteOffsetsToMutate) {
- checkNotNull(keyComponents);
- checkArgument(keyComponents.size() <= keyComponentCoders.size(),
- "Expected at most %s key component(s) but received %s.",
- keyComponentCoders.size(), keyComponents);
-
- final int numberOfKeyCodersToUse;
- final int shardOffset;
- if (isMetadataKey(keyComponents)) {
- numberOfKeyCodersToUse = numberOfMetadataShardKeyCoders;
- shardOffset = SHARD_BITS + 1;
- } else {
- numberOfKeyCodersToUse = numberOfShardKeyCoders;
- shardOffset = 0;
- }
-
- checkArgument(numberOfKeyCodersToUse <= keyComponents.size(),
- "Expected at least %s key component(s) but received %s.",
- numberOfShardKeyCoders, keyComponents);
-
- try {
- // Encode the shard portion
- for (int i = 0; i < numberOfKeyCodersToUse; ++i) {
- getKeyComponentCoder(i).encode(
- keyComponents.get(i), keyBytesToMutate.asOutputStream(), Context.NESTED);
- keyComponentByteOffsetsToMutate.add(keyBytesToMutate.size());
- }
- int rval = HASH_FUNCTION.hashBytes(
- keyBytesToMutate.array(), 0, keyBytesToMutate.size()).asInt() & SHARD_BITS;
- rval += shardOffset;
-
- // Encode the remainder
- for (int i = numberOfKeyCodersToUse; i < keyComponents.size(); ++i) {
- getKeyComponentCoder(i).encode(
- keyComponents.get(i), keyBytesToMutate.asOutputStream(), Context.NESTED);
- keyComponentByteOffsetsToMutate.add(keyBytesToMutate.size());
- }
- return rval;
- } catch (IOException e) {
- throw new IllegalStateException(
- String.format("Failed to hash %s with coder %s", keyComponents, this), e);
- }
- }
-
- @Override
- public List<Coder<?>> getCoderArguments() {
- return ImmutableList.<Coder<?>>builder()
- .addAll(keyComponentCoders)
- .add(valueCoder)
- .build();
- }
-
- @Override
- public CloudObject asCloudObject() {
- CloudObject cloudObject = super.asCloudObject();
- addLong(cloudObject, PropertyNames.NUM_SHARD_CODERS, numberOfShardKeyCoders);
- addLong(cloudObject, PropertyNames.NUM_METADATA_SHARD_CODERS, numberOfMetadataShardKeyCoders);
- return cloudObject;
- }
-
- @Override
- public void verifyDeterministic() throws Coder.NonDeterministicException {
- verifyDeterministic("Key component coders expected to be deterministic.", keyComponentCoders);
- verifyDeterministic("Value coder expected to be deterministic.", valueCoder);
- }
-
- @Override
- public boolean consistentWithEquals() {
- for (Coder<?> keyComponentCoder : keyComponentCoders) {
- if (!keyComponentCoder.consistentWithEquals()) {
- return false;
- }
- }
- return valueCoder.consistentWithEquals();
- }
-
- @Override
- public Object structuralValue(IsmRecord<V> record) throws Exception {
- checkState(record.getKeyComponents().size() == keyComponentCoders.size(),
- "Expected the number of key component coders %s "
- + "to match the number of key components %s.",
- keyComponentCoders.size(), record.getKeyComponents());
-
- if (record != null && consistentWithEquals()) {
- ArrayList<Object> keyComponentStructuralValues = new ArrayList<>();
- for (int i = 0; i < keyComponentCoders.size(); ++i) {
- keyComponentStructuralValues.add(
- getKeyComponentCoder(i).structuralValue(record.getKeyComponent(i)));
- }
- if (isMetadataKey(record.getKeyComponents())) {
- return IsmRecord.meta(keyComponentStructuralValues, record.getMetadata());
- } else {
- return IsmRecord.of(keyComponentStructuralValues,
- valueCoder.structuralValue(record.getValue()));
- }
- }
- return super.structuralValue(record);
- }
- }
-
- /**
- * Validates that the key portion of the given coder is deterministic.
- */
- public static void validateCoderIsCompatible(IsmRecordCoder<?> coder) {
- for (Coder<?> keyComponentCoder : coder.getKeyComponentCoders()) {
- try {
- keyComponentCoder.verifyDeterministic();
- } catch (NonDeterministicException e) {
- throw new IllegalArgumentException(
- String.format("Key component coder %s is expected to be deterministic.",
- keyComponentCoder), e);
- }
- }
- }
-
- /** Returns true if and only if any of the passed in key components represent a metadata key. */
- public static boolean isMetadataKey(List<?> keyComponents) {
- for (Object keyComponent : keyComponents) {
- if (keyComponent == METADATA_KEY) {
- return true;
- }
- }
- return false;
- }
-
- /** A marker object representing the wildcard metadata key component. */
- private static final Object METADATA_KEY = new Object() {
- @Override
- public String toString() {
- return "META";
- }
-
- @Override
- public boolean equals(Object obj) {
- return this == obj;
- }
-
- @Override
- public int hashCode() {
- return -1248902349;
- }
- };
-
- /**
- * An object representing a wild card for a key component.
- * Encoded using {@link MetadataKeyCoder}.
- */
- public static Object getMetadataKey() {
- return METADATA_KEY;
- }
-
- /**
- * A coder for metadata key component. Can be used to wrap key component coder allowing for
- * the metadata key component to be used as a place holder instead of an actual key.
- */
- public static class MetadataKeyCoder<K> extends StandardCoder<K> {
- public static <K> MetadataKeyCoder<K> of(Coder<K> keyCoder) {
- checkNotNull(keyCoder);
- return new MetadataKeyCoder<>(keyCoder);
- }
-
- /**
- * Returns an IsmRecordCoder with the specified coders. Note that this method is not meant
- * to be called by users but used by Jackson when decoding this coder.
- */
- @JsonCreator
- public static MetadataKeyCoder<?> of(
- @JsonProperty(PropertyNames.COMPONENT_ENCODINGS) List<Coder<?>> components) {
- Preconditions.checkArgument(components.size() == 1,
- "Expecting one component, got " + components.size());
- return of(components.get(0));
- }
-
- private final Coder<K> keyCoder;
-
- private MetadataKeyCoder(Coder<K> keyCoder) {
- this.keyCoder = keyCoder;
- }
-
- public Coder<K> getKeyCoder() {
- return keyCoder;
- }
-
- @Override
- public void encode(K value, OutputStream outStream, Coder.Context context)
- throws CoderException, IOException {
- if (value == METADATA_KEY) {
- outStream.write(0);
- } else {
- outStream.write(1);
- keyCoder.encode(value, outStream, context.nested());
- }
- }
-
- @Override
- public K decode(InputStream inStream, Coder.Context context)
- throws CoderException, IOException {
- int marker = inStream.read();
- if (marker == 0) {
- return (K) getMetadataKey();
- } else if (marker == 1) {
- return keyCoder.decode(inStream, context.nested());
- } else {
- throw new CoderException(String.format("Expected marker but got %s.", marker));
- }
- }
-
- @Override
- public List<Coder<?>> getCoderArguments() {
- return ImmutableList.<Coder<?>>of(keyCoder);
- }
-
- @Override
- public void verifyDeterministic() throws NonDeterministicException {
- verifyDeterministic("Expected key coder to be deterministic", keyCoder);
- }
- }
-
- /**
- * A shard descriptor containing shard id, the data block offset, and the index offset for the
- * given shard.
- */
- @AutoValue
- public abstract static class IsmShard {
- abstract int id();
- abstract long blockOffset();
- abstract long indexOffset();
-
- IsmShard() {}
-
- /** Returns an IsmShard with the given id, block offset and no index offset. */
- public static IsmShard of(int id, long blockOffset) {
- IsmShard ismShard = new AutoValue_IsmFormat_IsmShard(id, blockOffset, -1);
- checkState(id >= 0,
- "%s attempting to be written with negative shard id.",
- ismShard);
- checkState(blockOffset >= 0,
- "%s attempting to be written with negative block offset.",
- ismShard);
- return ismShard;
- }
-
- /** Returns an IsmShard with the given id, block offset, and index offset. */
- public static IsmShard of(int id, long blockOffset, long indexOffset) {
- IsmShard ismShard = new AutoValue_IsmFormat_IsmShard(id, blockOffset, indexOffset);
- checkState(id >= 0,
- "%s attempting to be written with negative shard id.",
- ismShard);
- checkState(blockOffset >= 0,
- "%s attempting to be written with negative block offset.",
- ismShard);
- checkState(indexOffset >= 0,
- "%s attempting to be written with negative index offset.",
- ismShard);
- return ismShard;
- }
-
- /** Return the shard id. */
- public int getId() {
- return id();
- }
-
- /** Return the absolute position within the Ism file where the data block begins. */
- public long getBlockOffset() {
- return blockOffset();
- }
-
- /**
- * Return the absolute position within the Ism file where the index block begins.
- * Throws {@link IllegalStateException} if the index offset was never specified.
- */
- public long getIndexOffset() {
- checkState(indexOffset() >= 0,
- "Unable to fetch index offset because it was never specified.");
- return indexOffset();
- }
-
- /** Returns a new IsmShard like this one with the specified index offset. */
- public IsmShard withIndexOffset(long indexOffset) {
- return of(id(), blockOffset(), indexOffset);
- }
- }
-
- /**
- * A {@link ListCoder} wrapping a {@link IsmShardCoder} used to encode the shard index.
- * See {@link ListCoder} for its encoding specification and {@link IsmShardCoder} for its
- * encoding specification.
- */
- public static final Coder<List<IsmShard>> ISM_SHARD_INDEX_CODER =
- ListCoder.of(IsmShardCoder.of());
-
- /**
- * A coder for {@link IsmShard}s.
- *
- * The shard descriptor is encoded as:
- * <ul>
- * <li>id (variable length integer encoding)</li>
- * <li>blockOffset (variable length long encoding)</li>
- * <li>indexOffset (variable length long encoding)</li>
- * </ul>
- */
- public static class IsmShardCoder extends AtomicCoder<IsmShard> {
- private static final IsmShardCoder INSTANCE = new IsmShardCoder();
-
- /** Returns an IsmShardCoder. */
- @JsonCreator
- public static IsmShardCoder of() {
- return INSTANCE;
- }
-
- private IsmShardCoder() {
- }
-
- @Override
- public void encode(IsmShard value, OutputStream outStream, Coder.Context context)
- throws CoderException, IOException {
- checkState(value.getIndexOffset() >= 0,
- "%s attempting to be written without index offset.",
- value);
- VarIntCoder.of().encode(value.getId(), outStream, context.nested());
- VarLongCoder.of().encode(value.getBlockOffset(), outStream, context.nested());
- VarLongCoder.of().encode(value.getIndexOffset(), outStream, context.nested());
- }
-
- @Override
- public IsmShard decode(
- InputStream inStream, Coder.Context context) throws CoderException, IOException {
- return IsmShard.of(
- VarIntCoder.of().decode(inStream, context),
- VarLongCoder.of().decode(inStream, context),
- VarLongCoder.of().decode(inStream, context));
- }
-
- @Override
- public boolean consistentWithEquals() {
- return true;
- }
- }
-
- /**
- * The prefix used before each key which contains the number of shared and unshared
- * bytes from the previous key that was read. The key prefix along with the previous key
- * and the unshared key bytes allows one to construct the current key by doing the following
- * {@code currentKey = previousKey[0 : sharedBytes] + read(unsharedBytes)}.
- *
- * <p>The key prefix is encoded as:
- * <ul>
- * <li>number of shared key bytes (variable length integer coding)</li>
- * <li>number of unshared key bytes (variable length integer coding)</li>
- * </ul>
- */
- @AutoValue
- public abstract static class KeyPrefix {
- public abstract int getSharedKeySize();
- public abstract int getUnsharedKeySize();
-
- public static KeyPrefix of(int sharedKeySize, int unsharedKeySize) {
- return new AutoValue_IsmFormat_KeyPrefix(sharedKeySize, unsharedKeySize);
- }
- }
-
- /** A {@link Coder} for {@link KeyPrefix}. */
- public static final class KeyPrefixCoder extends AtomicCoder<KeyPrefix> {
- private static final KeyPrefixCoder INSTANCE = new KeyPrefixCoder();
-
- @JsonCreator
- public static KeyPrefixCoder of() {
- return INSTANCE;
- }
-
- @Override
- public void encode(KeyPrefix value, OutputStream outStream, Coder.Context context)
- throws CoderException, IOException {
- VarInt.encode(value.getSharedKeySize(), outStream);
- VarInt.encode(value.getUnsharedKeySize(), outStream);
- }
-
- @Override
- public KeyPrefix decode(InputStream inStream, Coder.Context context)
- throws CoderException, IOException {
- return KeyPrefix.of(VarInt.decodeInt(inStream), VarInt.decodeInt(inStream));
- }
-
- @Override
- public boolean consistentWithEquals() {
- return true;
- }
-
- @Override
- public boolean isRegisterByteSizeObserverCheap(KeyPrefix value, Coder.Context context) {
- return true;
- }
-
- @Override
- public long getEncodedElementByteSize(KeyPrefix value, Coder.Context context)
- throws Exception {
- Preconditions.checkNotNull(value);
- return VarInt.getLength(value.getSharedKeySize())
- + VarInt.getLength(value.getUnsharedKeySize());
- }
- }
-
- /**
- * The footer stores the relevant information required to locate the index and bloom filter.
- * It also stores a version byte and the number of keys stored.
- *
- * <p>The footer is encoded as the value containing:
- * <ul>
- * <li>start of bloom filter offset (big endian long coding)</li>
- * <li>start of shard index position offset (big endian long coding)</li>
- * <li>number of keys in file (big endian long coding)</li>
- * <li>0x01 (version key as a single byte)</li>
- * </ul>
- */
- @AutoValue
- public abstract static class Footer {
- public static final int LONG_BYTES = 8;
- public static final int FIXED_LENGTH = 3 * LONG_BYTES + 1;
- public static final byte VERSION = 2;
-
- public abstract byte getVersion();
- public abstract long getIndexPosition();
- public abstract long getBloomFilterPosition();
- public abstract long getNumberOfKeys();
-
- public static Footer of(long indexPosition, long bloomFilterPosition, long numberOfKeys) {
- return new AutoValue_IsmFormat_Footer(
- VERSION, indexPosition, bloomFilterPosition, numberOfKeys);
- }
- }
-
- /** A {@link Coder} for {@link Footer}. */
- public static final class FooterCoder extends AtomicCoder<Footer> {
- private static final FooterCoder INSTANCE = new FooterCoder();
-
- @JsonCreator
- public static FooterCoder of() {
- return INSTANCE;
- }
-
- @Override
- public void encode(Footer value, OutputStream outStream, Coder.Context context)
- throws CoderException, IOException {
- DataOutputStream dataOut = new DataOutputStream(outStream);
- dataOut.writeLong(value.getIndexPosition());
- dataOut.writeLong(value.getBloomFilterPosition());
- dataOut.writeLong(value.getNumberOfKeys());
- dataOut.write(Footer.VERSION);
- }
-
- @Override
- public Footer decode(InputStream inStream, Coder.Context context)
- throws CoderException, IOException {
- DataInputStream dataIn = new DataInputStream(inStream);
- Footer footer = Footer.of(dataIn.readLong(), dataIn.readLong(), dataIn.readLong());
- int version = dataIn.read();
- if (version != Footer.VERSION) {
- throw new IOException("Unknown version " + version + ". "
- + "Only version 2 is currently supported.");
- }
- return footer;
- }
-
- @Override
- public boolean consistentWithEquals() {
- return true;
- }
-
- @Override
- public boolean isRegisterByteSizeObserverCheap(Footer value, Coder.Context context) {
- return true;
- }
-
- @Override
- public long getEncodedElementByteSize(Footer value, Coder.Context context)
- throws Exception {
- return Footer.FIXED_LENGTH;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/27979d76/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/worker/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/worker/package-info.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/worker/package-info.java
deleted file mode 100644
index 6133148..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/worker/package-info.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * Implementation of the harness that runs on each Google Compute Engine instance to coordinate
- * execution of Pipeline code.
- */
-@ParametersAreNonnullByDefault
-package org.apache.beam.sdk.runners.worker;
-
-import javax.annotation.ParametersAreNonnullByDefault;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/27979d76/sdks/java/core/src/main/java/org/apache/beam/sdk/util/DoFnInfo.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/DoFnInfo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/DoFnInfo.java
deleted file mode 100644
index ae19a17..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/DoFnInfo.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.values.PCollectionView;
-
-import java.io.Serializable;
-
-/**
- * Wrapper class holding the necessary information to serialize a DoFn.
- *
- * @param <InputT> the type of the (main) input elements of the DoFn
- * @param <OutputT> the type of the (main) output elements of the DoFn
- */
-public class DoFnInfo<InputT, OutputT> implements Serializable {
- private final DoFn<InputT, OutputT> doFn;
- private final WindowingStrategy<?, ?> windowingStrategy;
- private final Iterable<PCollectionView<?>> sideInputViews;
- private final Coder<InputT> inputCoder;
-
- public DoFnInfo(DoFn<InputT, OutputT> doFn, WindowingStrategy<?, ?> windowingStrategy) {
- this.doFn = doFn;
- this.windowingStrategy = windowingStrategy;
- this.sideInputViews = null;
- this.inputCoder = null;
- }
-
- public DoFnInfo(DoFn<InputT, OutputT> doFn, WindowingStrategy<?, ?> windowingStrategy,
- Iterable<PCollectionView<?>> sideInputViews, Coder<InputT> inputCoder) {
- this.doFn = doFn;
- this.windowingStrategy = windowingStrategy;
- this.sideInputViews = sideInputViews;
- this.inputCoder = inputCoder;
- }
-
- public DoFn<InputT, OutputT> getDoFn() {
- return doFn;
- }
-
- public WindowingStrategy<?, ?> getWindowingStrategy() {
- return windowingStrategy;
- }
-
- public Iterable<PCollectionView<?>> getSideInputViews() {
- return sideInputViews;
- }
-
- public Coder<InputT> getInputCoder() {
- return inputCoder;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/27979d76/sdks/java/core/src/main/java/org/apache/beam/sdk/util/OutputReference.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/OutputReference.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/OutputReference.java
deleted file mode 100644
index 5e30172..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/OutputReference.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import static com.google.api.client.util.Preconditions.checkNotNull;
-
-import com.google.api.client.json.GenericJson;
-import com.google.api.client.util.Key;
-
-/**
- * A representation used by {@link com.google.api.services.dataflow.model.Step}s
- * to reference the output of other {@code Step}s.
- */
-public final class OutputReference extends GenericJson {
- @Key("@type")
- public final String type = "OutputReference";
-
- @Key("step_name")
- private final String stepName;
-
- @Key("output_name")
- private final String outputName;
-
- public OutputReference(String stepName, String outputName) {
- this.stepName = checkNotNull(stepName);
- this.outputName = checkNotNull(outputName);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/27979d76/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RandomAccessData.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RandomAccessData.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RandomAccessData.java
deleted file mode 100644
index 2d902f4..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RandomAccessData.java
+++ /dev/null
@@ -1,355 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import org.apache.beam.sdk.coders.AtomicCoder;
-import org.apache.beam.sdk.coders.ByteArrayCoder;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-
-import com.google.common.base.MoreObjects;
-import com.google.common.io.ByteStreams;
-import com.google.common.primitives.UnsignedBytes;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.Arrays;
-import java.util.Comparator;
-
-import javax.annotation.concurrent.NotThreadSafe;
-
-/**
- * An elastic-sized byte array which allows you to manipulate it as a stream, or access
- * it directly. This allows for a quick succession of moving bytes from an {@link InputStream}
- * to this wrapper to be used as an {@link OutputStream} and vice versa. This wrapper
- * also provides random access to bytes stored within. This wrapper allows users to finely
- * control the number of byte copies that occur.
- *
- * Anything stored within the in-memory buffer from offset {@link #size()} is considered temporary
- * unused storage.
- */
-@NotThreadSafe
-public class RandomAccessData {
- /**
- * A {@link Coder} which encodes the valid parts of this stream.
- * This follows the same encoding scheme as {@link ByteArrayCoder}.
- * This coder is deterministic and consistent with equals.
- *
- * This coder does not support encoding positive infinity.
- */
- public static class RandomAccessDataCoder extends AtomicCoder<RandomAccessData> {
- private static final RandomAccessDataCoder INSTANCE = new RandomAccessDataCoder();
-
- @JsonCreator
- public static RandomAccessDataCoder of() {
- return INSTANCE;
- }
-
- @Override
- public void encode(RandomAccessData value, OutputStream outStream, Coder.Context context)
- throws CoderException, IOException {
- if (value == POSITIVE_INFINITY) {
- throw new CoderException("Positive infinity can not be encoded.");
- }
- if (!context.isWholeStream) {
- VarInt.encode(value.size, outStream);
- }
- value.writeTo(outStream, 0, value.size);
- }
-
- @Override
- public RandomAccessData decode(InputStream inStream, Coder.Context context)
- throws CoderException, IOException {
- RandomAccessData rval = new RandomAccessData();
- if (!context.isWholeStream) {
- int length = VarInt.decodeInt(inStream);
- rval.readFrom(inStream, 0, length);
- } else {
- ByteStreams.copy(inStream, rval.asOutputStream());
- }
- return rval;
- }
-
- @Override
- public boolean consistentWithEquals() {
- return true;
- }
-
- @Override
- public boolean isRegisterByteSizeObserverCheap(
- RandomAccessData value, Coder.Context context) {
- return true;
- }
-
- @Override
- protected long getEncodedElementByteSize(RandomAccessData value, Coder.Context context)
- throws Exception {
- if (value == null) {
- throw new CoderException("cannot encode a null in memory stream");
- }
- long size = 0;
- if (!context.isWholeStream) {
- size += VarInt.getLength(value.size);
- }
- return size + value.size;
- }
- }
-
- public static final UnsignedLexicographicalComparator UNSIGNED_LEXICOGRAPHICAL_COMPARATOR =
- new UnsignedLexicographicalComparator();
-
- /**
- * A {@link Comparator} that compares two byte arrays lexicographically. It compares
- * values as a list of unsigned bytes. The first pair of values that follow any common prefix,
- * or when one array is a prefix of the other, treats the shorter array as the lesser.
- * For example, [] < [0x01] < [0x01, 0x7F] < [0x01, 0x80] < [0x02] < POSITIVE INFINITY.
- *
- * <p>Note that a token type of positive infinity is supported and is greater than
- * all other {@link RandomAccessData}.
- */
- public static final class UnsignedLexicographicalComparator
- implements Comparator<RandomAccessData> {
- // Do not instantiate
- private UnsignedLexicographicalComparator() {
- }
-
- @Override
- public int compare(RandomAccessData o1, RandomAccessData o2) {
- return compare(o1, o2, 0 /* start from the beginning */);
- }
-
- /**
- * Compare the two sets of bytes starting at the given offset.
- */
- public int compare(RandomAccessData o1, RandomAccessData o2, int startOffset) {
- if (o1 == o2) {
- return 0;
- }
- if (o1 == POSITIVE_INFINITY) {
- return 1;
- }
- if (o2 == POSITIVE_INFINITY) {
- return -1;
- }
-
- int minBytesLen = Math.min(o1.size, o2.size);
- for (int i = startOffset; i < minBytesLen; i++) {
- // unsigned comparison
- int b1 = o1.buffer[i] & 0xFF;
- int b2 = o2.buffer[i] & 0xFF;
- if (b1 == b2) {
- continue;
- }
- // Return the stream with the smaller byte as the smaller value.
- return b1 - b2;
- }
- // If one is a prefix of the other, return the shorter one as the smaller one.
- // If both lengths are equal, then both streams are equal.
- return o1.size - o2.size;
- }
-
- /**
- * Compute the length of the common prefix of the two provided sets of bytes.
- */
- public int commonPrefixLength(RandomAccessData o1, RandomAccessData o2) {
- int minBytesLen = Math.min(o1.size, o2.size);
- for (int i = 0; i < minBytesLen; i++) {
- // unsigned comparison
- int b1 = o1.buffer[i] & 0xFF;
- int b2 = o2.buffer[i] & 0xFF;
- if (b1 != b2) {
- return i;
- }
- }
- return minBytesLen;
- }
- }
-
- /** A token type representing positive infinity. */
- static final RandomAccessData POSITIVE_INFINITY = new RandomAccessData(0);
-
- /**
- * Returns a RandomAccessData that is the smallest value of same length which
- * is strictly greater than this. Note that if this is empty or is all 0xFF then
- * a token value of positive infinity is returned.
- *
- * The {@link UnsignedLexicographicalComparator} supports comparing {@link RandomAccessData}
- * with support for positive infinitiy.
- */
- public RandomAccessData increment() throws IOException {
- RandomAccessData copy = copy();
- for (int i = copy.size - 1; i >= 0; --i) {
- if (copy.buffer[i] != UnsignedBytes.MAX_VALUE) {
- copy.buffer[i] = UnsignedBytes.checkedCast(UnsignedBytes.toInt(copy.buffer[i]) + 1);
- return copy;
- }
- }
- return POSITIVE_INFINITY;
- }
-
- private static final int DEFAULT_INITIAL_BUFFER_SIZE = 128;
-
- /** Constructs a RandomAccessData with a default buffer size. */
- public RandomAccessData() {
- this(DEFAULT_INITIAL_BUFFER_SIZE);
- }
-
- /** Constructs a RandomAccessData with the initial buffer. */
- public RandomAccessData(byte[] initialBuffer) {
- checkNotNull(initialBuffer);
- this.buffer = initialBuffer;
- this.size = initialBuffer.length;
- }
-
- /** Constructs a RandomAccessData with the given buffer size. */
- public RandomAccessData(int initialBufferSize) {
- checkArgument(initialBufferSize >= 0, "Expected initial buffer size to be greater than zero.");
- this.buffer = new byte[initialBufferSize];
- }
-
- private byte[] buffer;
- private int size;
-
- /** Returns the backing array. */
- public byte[] array() {
- return buffer;
- }
-
- /** Returns the number of bytes in the backing array that are valid. */
- public int size() {
- return size;
- }
-
- /** Resets the end of the stream to the specified position. */
- public void resetTo(int position) {
- ensureCapacity(position);
- size = position;
- }
-
- private final OutputStream outputStream = new OutputStream() {
- @Override
- public void write(int b) throws IOException {
- ensureCapacity(size + 1);
- buffer[size] = (byte) b;
- size += 1;
- }
-
- @Override
- public void write(byte[] b, int offset, int length) throws IOException {
- ensureCapacity(size + length);
- System.arraycopy(b, offset, buffer, size, length);
- size += length;
- }
- };
-
- /**
- * Returns an output stream which writes to the backing buffer from the current position.
- * Note that the internal buffer will grow as required to accomodate all data written.
- */
- public OutputStream asOutputStream() {
- return outputStream;
- }
-
- /**
- * Returns an {@link InputStream} wrapper which supplies the portion of this backing byte buffer
- * starting at {@code offset} and up to {@code length} bytes. Note that the returned
- * {@link InputStream} is only a wrapper and any modifications to the underlying
- * {@link RandomAccessData} will be visible by the {@link InputStream}.
- */
- public InputStream asInputStream(final int offset, final int length) {
- return new ByteArrayInputStream(buffer, offset, length);
- }
-
- /**
- * Writes {@code length} bytes starting at {@code offset} from the backing data store to the
- * specified output stream.
- */
- public void writeTo(OutputStream out, int offset, int length) throws IOException {
- out.write(buffer, offset, length);
- }
-
- /**
- * Reads {@code length} bytes from the specified input stream writing them into the backing
- * data store starting at {@code offset}.
- *
- * <p>Note that the in memory stream will be grown to ensure there is enough capacity.
- */
- public void readFrom(InputStream inStream, int offset, int length) throws IOException {
- ensureCapacity(offset + length);
- ByteStreams.readFully(inStream, buffer, offset, length);
- size = offset + length;
- }
-
- /** Returns a copy of this RandomAccessData. */
- public RandomAccessData copy() throws IOException {
- RandomAccessData copy = new RandomAccessData(size);
- writeTo(copy.asOutputStream(), 0, size);
- return copy;
- }
-
- @Override
- public boolean equals(Object other) {
- if (other == this) {
- return true;
- }
- if (!(other instanceof RandomAccessData)) {
- return false;
- }
- return UNSIGNED_LEXICOGRAPHICAL_COMPARATOR.compare(this, (RandomAccessData) other) == 0;
- }
-
- @Override
- public int hashCode() {
- int result = 1;
- for (int i = 0; i < size; ++i) {
- result = 31 * result + buffer[i];
- }
-
- return result;
- }
-
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(this)
- .add("buffer", Arrays.copyOf(buffer, size))
- .add("size", size)
- .toString();
- }
-
- private void ensureCapacity(int minCapacity) {
- // If we have enough space, don't grow the buffer.
- if (minCapacity <= buffer.length) {
- return;
- }
-
- // Try to double the size of the buffer, if thats not enough, just use the new capacity.
- // Note that we use Math.min(long, long) to not cause overflow on the multiplication.
- int newCapacity = (int) Math.min(Integer.MAX_VALUE, buffer.length * 2L);
- if (newCapacity < minCapacity) {
- newCapacity = minCapacity;
- }
- buffer = Arrays.copyOf(buffer, newCapacity);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/27979d76/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TimeUtil.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TimeUtil.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TimeUtil.java
deleted file mode 100644
index db5c760..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TimeUtil.java
+++ /dev/null
@@ -1,166 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import org.joda.time.DateTime;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-import org.joda.time.ReadableDuration;
-import org.joda.time.ReadableInstant;
-import org.joda.time.chrono.ISOChronology;
-
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import javax.annotation.Nullable;
-
-/**
- * A helper class for converting between Dataflow API and SDK time
- * representations.
- *
- * <p>Dataflow API times are strings of the form
- * {@code YYYY-MM-dd'T'HH:mm:ss[.nnnn]'Z'}: that is, RFC 3339
- * strings with optional fractional seconds and a 'Z' offset.
- *
- * <p>Dataflow API durations are strings of the form {@code ['-']sssss[.nnnn]'s'}:
- * that is, seconds with optional fractional seconds and a literal 's' at the end.
- *
- * <p>In both formats, fractional seconds are either three digits (millisecond
- * resolution), six digits (microsecond resolution), or nine digits (nanosecond
- * resolution).
- */
-public final class TimeUtil {
- private TimeUtil() {} // Non-instantiable.
-
- private static final Pattern DURATION_PATTERN = Pattern.compile("(\\d+)(?:\\.(\\d+))?s");
- private static final Pattern TIME_PATTERN =
- Pattern.compile("(\\d{4})-(\\d{2})-(\\d{2})T(\\d{2}):(\\d{2}):(\\d{2})(?:\\.(\\d+))?Z");
-
- /**
- * Converts a {@link ReadableInstant} into a Dateflow API time value.
- */
- public static String toCloudTime(ReadableInstant instant) {
- // Note that since Joda objects use millisecond resolution, we always
- // produce either no fractional seconds or fractional seconds with
- // millisecond resolution.
-
- // Translate the ReadableInstant to a DateTime with ISOChronology.
- DateTime time = new DateTime(instant);
-
- int millis = time.getMillisOfSecond();
- if (millis == 0) {
- return String.format("%04d-%02d-%02dT%02d:%02d:%02dZ",
- time.getYear(),
- time.getMonthOfYear(),
- time.getDayOfMonth(),
- time.getHourOfDay(),
- time.getMinuteOfHour(),
- time.getSecondOfMinute());
- } else {
- return String.format("%04d-%02d-%02dT%02d:%02d:%02d.%03dZ",
- time.getYear(),
- time.getMonthOfYear(),
- time.getDayOfMonth(),
- time.getHourOfDay(),
- time.getMinuteOfHour(),
- time.getSecondOfMinute(),
- millis);
- }
- }
-
- /**
- * Converts a time value received via the Dataflow API into the corresponding
- * {@link Instant}.
- * @return the parsed time, or null if a parse error occurs
- */
- @Nullable
- public static Instant fromCloudTime(String time) {
- Matcher matcher = TIME_PATTERN.matcher(time);
- if (!matcher.matches()) {
- return null;
- }
- int year = Integer.valueOf(matcher.group(1));
- int month = Integer.valueOf(matcher.group(2));
- int day = Integer.valueOf(matcher.group(3));
- int hour = Integer.valueOf(matcher.group(4));
- int minute = Integer.valueOf(matcher.group(5));
- int second = Integer.valueOf(matcher.group(6));
- int millis = 0;
-
- String frac = matcher.group(7);
- if (frac != null) {
- int fracs = Integer.valueOf(frac);
- if (frac.length() == 3) { // millisecond resolution
- millis = fracs;
- } else if (frac.length() == 6) { // microsecond resolution
- millis = fracs / 1000;
- } else if (frac.length() == 9) { // nanosecond resolution
- millis = fracs / 1000000;
- } else {
- return null;
- }
- }
-
- return new DateTime(year, month, day, hour, minute, second, millis,
- ISOChronology.getInstanceUTC()).toInstant();
- }
-
- /**
- * Converts a {@link ReadableDuration} into a Dataflow API duration string.
- */
- public static String toCloudDuration(ReadableDuration duration) {
- // Note that since Joda objects use millisecond resolution, we always
- // produce either no fractional seconds or fractional seconds with
- // millisecond resolution.
- long millis = duration.getMillis();
- long seconds = millis / 1000;
- millis = millis % 1000;
- if (millis == 0) {
- return String.format("%ds", seconds);
- } else {
- return String.format("%d.%03ds", seconds, millis);
- }
- }
-
- /**
- * Converts a Dataflow API duration string into a {@link Duration}.
- * @return the parsed duration, or null if a parse error occurs
- */
- @Nullable
- public static Duration fromCloudDuration(String duration) {
- Matcher matcher = DURATION_PATTERN.matcher(duration);
- if (!matcher.matches()) {
- return null;
- }
- long millis = Long.valueOf(matcher.group(1)) * 1000;
- String frac = matcher.group(2);
- if (frac != null) {
- long fracs = Long.valueOf(frac);
- if (frac.length() == 3) { // millisecond resolution
- millis += fracs;
- } else if (frac.length() == 6) { // microsecond resolution
- millis += fracs / 1000;
- } else if (frac.length() == 9) { // nanosecond resolution
- millis += fracs / 1000000;
- } else {
- return null;
- }
- }
- return Duration.millis(millis);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/27979d76/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/worker/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/worker/package-info.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/worker/package-info.java
deleted file mode 100644
index f295419..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/worker/package-info.java
+++ /dev/null
@@ -1,19 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/** Defines utilities used to implement the harness that runs user code. **/
-package org.apache.beam.sdk.util.common.worker;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/27979d76/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RandomAccessDataTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RandomAccessDataTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RandomAccessDataTest.java
deleted file mode 100644
index b990212..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RandomAccessDataTest.java
+++ /dev/null
@@ -1,207 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertSame;
-import static org.junit.Assert.assertTrue;
-
-import org.apache.beam.sdk.coders.Coder.Context;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.testing.CoderProperties;
-import org.apache.beam.sdk.util.RandomAccessData.RandomAccessDataCoder;
-
-import com.google.common.primitives.UnsignedBytes;
-
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.InputStream;
-import java.util.Arrays;
-
-/**
- * Tests for {@link RandomAccessData}.
- */
-@RunWith(JUnit4.class)
-public class RandomAccessDataTest {
- private static final byte[] TEST_DATA_A = new byte[]{ 0x01, 0x02, 0x03 };
- private static final byte[] TEST_DATA_B = new byte[]{ 0x06, 0x05, 0x04, 0x03 };
- private static final byte[] TEST_DATA_C = new byte[]{ 0x06, 0x05, 0x03, 0x03 };
-
- @Rule public ExpectedException expectedException = ExpectedException.none();
-
- @Test
- public void testCoder() throws Exception {
- RandomAccessData streamA = new RandomAccessData();
- streamA.asOutputStream().write(TEST_DATA_A);
- RandomAccessData streamB = new RandomAccessData();
- streamB.asOutputStream().write(TEST_DATA_A);
- CoderProperties.coderDecodeEncodeEqual(RandomAccessDataCoder.of(), streamA);
- CoderProperties.coderDeterministic(RandomAccessDataCoder.of(), streamA, streamB);
- CoderProperties.coderConsistentWithEquals(RandomAccessDataCoder.of(), streamA, streamB);
- CoderProperties.coderSerializable(RandomAccessDataCoder.of());
- CoderProperties.structuralValueConsistentWithEquals(
- RandomAccessDataCoder.of(), streamA, streamB);
- assertTrue(RandomAccessDataCoder.of().isRegisterByteSizeObserverCheap(streamA, Context.NESTED));
- assertTrue(RandomAccessDataCoder.of().isRegisterByteSizeObserverCheap(streamA, Context.OUTER));
- assertEquals(4, RandomAccessDataCoder.of().getEncodedElementByteSize(streamA, Context.NESTED));
- assertEquals(3, RandomAccessDataCoder.of().getEncodedElementByteSize(streamA, Context.OUTER));
- }
-
- @Test
- public void testCoderWithPositiveInfinityIsError() throws Exception {
- expectedException.expect(CoderException.class);
- expectedException.expectMessage("Positive infinity can not be encoded");
- RandomAccessDataCoder.of().encode(
- RandomAccessData.POSITIVE_INFINITY, new ByteArrayOutputStream(), Context.OUTER);
- }
-
- @Test
- public void testLexicographicalComparator() throws Exception {
- RandomAccessData streamA = new RandomAccessData();
- streamA.asOutputStream().write(TEST_DATA_A);
- RandomAccessData streamB = new RandomAccessData();
- streamB.asOutputStream().write(TEST_DATA_B);
- RandomAccessData streamC = new RandomAccessData();
- streamC.asOutputStream().write(TEST_DATA_C);
- assertTrue(RandomAccessData.UNSIGNED_LEXICOGRAPHICAL_COMPARATOR.compare(
- streamA, streamB) < 0);
- assertTrue(RandomAccessData.UNSIGNED_LEXICOGRAPHICAL_COMPARATOR.compare(
- streamB, streamA) > 0);
- assertTrue(RandomAccessData.UNSIGNED_LEXICOGRAPHICAL_COMPARATOR.compare(
- streamB, streamB) == 0);
- // Check common prefix length.
- assertEquals(2, RandomAccessData.UNSIGNED_LEXICOGRAPHICAL_COMPARATOR.commonPrefixLength(
- streamB, streamC));
- // Check that we honor the start offset.
- assertTrue(RandomAccessData.UNSIGNED_LEXICOGRAPHICAL_COMPARATOR.compare(
- streamB, streamC, 3) == 0);
- // Test positive infinity comparisons.
- assertTrue(RandomAccessData.UNSIGNED_LEXICOGRAPHICAL_COMPARATOR.compare(
- streamA, RandomAccessData.POSITIVE_INFINITY) < 0);
- assertTrue(RandomAccessData.UNSIGNED_LEXICOGRAPHICAL_COMPARATOR.compare(
- RandomAccessData.POSITIVE_INFINITY, RandomAccessData.POSITIVE_INFINITY) == 0);
- assertTrue(RandomAccessData.UNSIGNED_LEXICOGRAPHICAL_COMPARATOR.compare(
- RandomAccessData.POSITIVE_INFINITY, streamA) > 0);
- }
-
- @Test
- public void testEqualsAndHashCode() throws Exception {
- // Test that equality by reference works
- RandomAccessData streamA = new RandomAccessData();
- streamA.asOutputStream().write(TEST_DATA_A);
- assertEquals(streamA, streamA);
- assertEquals(streamA.hashCode(), streamA.hashCode());
-
- // Test different objects containing the same data are the same
- RandomAccessData streamACopy = new RandomAccessData();
- streamACopy.asOutputStream().write(TEST_DATA_A);
- assertEquals(streamA, streamACopy);
- assertEquals(streamA.hashCode(), streamACopy.hashCode());
-
- // Test same length streams with different data differ
- RandomAccessData streamB = new RandomAccessData();
- streamB.asOutputStream().write(new byte[]{ 0x01, 0x02, 0x04 });
- assertNotEquals(streamA, streamB);
- assertNotEquals(streamA.hashCode(), streamB.hashCode());
-
- // Test different length streams differ
- streamB.asOutputStream().write(TEST_DATA_B);
- assertNotEquals(streamA, streamB);
- assertNotEquals(streamA.hashCode(), streamB.hashCode());
- }
-
- @Test
- public void testResetTo() throws Exception {
- RandomAccessData stream = new RandomAccessData();
- stream.asOutputStream().write(TEST_DATA_A);
- stream.resetTo(1);
- assertEquals(1, stream.size());
- stream.asOutputStream().write(TEST_DATA_A);
- assertArrayEquals(new byte[]{ 0x01, 0x01, 0x02, 0x03 },
- Arrays.copyOf(stream.array(), stream.size()));
- }
-
- @Test
- public void testAsInputStream() throws Exception {
- RandomAccessData stream = new RandomAccessData();
- stream.asOutputStream().write(TEST_DATA_A);
- InputStream in = stream.asInputStream(1, 1);
- assertEquals(0x02, in.read());
- assertEquals(-1, in.read());
- in.close();
- }
-
- @Test
- public void testReadFrom() throws Exception {
- ByteArrayInputStream bais = new ByteArrayInputStream(TEST_DATA_A);
- RandomAccessData stream = new RandomAccessData();
- stream.readFrom(bais, 3, 2);
- assertArrayEquals(new byte[]{ 0x00, 0x00, 0x00, 0x01, 0x02 },
- Arrays.copyOf(stream.array(), stream.size()));
- bais.close();
- }
-
- @Test
- public void testWriteTo() throws Exception {
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- RandomAccessData stream = new RandomAccessData();
- stream.asOutputStream().write(TEST_DATA_B);
- stream.writeTo(baos, 1, 2);
- assertArrayEquals(new byte[]{ 0x05, 0x04 }, baos.toByteArray());
- baos.close();
- }
-
- @Test
- public void testThatRandomAccessDataGrowsWhenResettingToPositionBeyondEnd() throws Exception {
- RandomAccessData stream = new RandomAccessData(0);
- assertArrayEquals(new byte[0], stream.array());
- stream.resetTo(3); // force resize
- assertArrayEquals(new byte[]{ 0x00, 0x00, 0x00 }, stream.array());
- }
-
- @Test
- public void testThatRandomAccessDataGrowsWhenReading() throws Exception {
- RandomAccessData stream = new RandomAccessData(0);
- assertArrayEquals(new byte[0], stream.array());
- stream.readFrom(new ByteArrayInputStream(TEST_DATA_A), 0, TEST_DATA_A.length);
- assertArrayEquals(TEST_DATA_A,
- Arrays.copyOf(stream.array(), TEST_DATA_A.length));
- }
-
- @Test
- public void testIncrement() throws Exception {
- assertEquals(new RandomAccessData(new byte[]{ 0x00, 0x01 }),
- new RandomAccessData(new byte[]{ 0x00, 0x00 }).increment());
- assertEquals(new RandomAccessData(new byte[]{ 0x01, UnsignedBytes.MAX_VALUE }),
- new RandomAccessData(new byte[]{ 0x00, UnsignedBytes.MAX_VALUE }).increment());
-
- // Test for positive infinity
- assertSame(RandomAccessData.POSITIVE_INFINITY, new RandomAccessData(new byte[0]).increment());
- assertSame(RandomAccessData.POSITIVE_INFINITY,
- new RandomAccessData(new byte[]{ UnsignedBytes.MAX_VALUE }).increment());
- assertSame(RandomAccessData.POSITIVE_INFINITY, RandomAccessData.POSITIVE_INFINITY.increment());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/27979d76/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TimeUtilTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TimeUtilTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TimeUtilTest.java
deleted file mode 100644
index b318dee..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TimeUtilTest.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import static org.apache.beam.sdk.util.TimeUtil.fromCloudDuration;
-import static org.apache.beam.sdk.util.TimeUtil.fromCloudTime;
-import static org.apache.beam.sdk.util.TimeUtil.toCloudDuration;
-import static org.apache.beam.sdk.util.TimeUtil.toCloudTime;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/** Unit tests for {@link TimeUtil}. */
-@RunWith(JUnit4.class)
-public final class TimeUtilTest {
- @Test
- public void toCloudTimeShouldPrintTimeStrings() {
- assertEquals("1970-01-01T00:00:00Z", toCloudTime(new Instant(0)));
- assertEquals("1970-01-01T00:00:00.001Z", toCloudTime(new Instant(1)));
- }
-
- @Test
- public void fromCloudTimeShouldParseTimeStrings() {
- assertEquals(new Instant(0), fromCloudTime("1970-01-01T00:00:00Z"));
- assertEquals(new Instant(1), fromCloudTime("1970-01-01T00:00:00.001Z"));
- assertEquals(new Instant(1), fromCloudTime("1970-01-01T00:00:00.001000Z"));
- assertEquals(new Instant(1), fromCloudTime("1970-01-01T00:00:00.001001Z"));
- assertEquals(new Instant(1), fromCloudTime("1970-01-01T00:00:00.001000000Z"));
- assertEquals(new Instant(1), fromCloudTime("1970-01-01T00:00:00.001000001Z"));
- assertNull(fromCloudTime(""));
- assertNull(fromCloudTime("1970-01-01T00:00:00"));
- }
-
- @Test
- public void toCloudDurationShouldPrintDurationStrings() {
- assertEquals("0s", toCloudDuration(Duration.ZERO));
- assertEquals("4s", toCloudDuration(Duration.millis(4000)));
- assertEquals("4.001s", toCloudDuration(Duration.millis(4001)));
- }
-
- @Test
- public void fromCloudDurationShouldParseDurationStrings() {
- assertEquals(Duration.millis(4000), fromCloudDuration("4s"));
- assertEquals(Duration.millis(4001), fromCloudDuration("4.001s"));
- assertEquals(Duration.millis(4001), fromCloudDuration("4.001000s"));
- assertEquals(Duration.millis(4001), fromCloudDuration("4.001001s"));
- assertEquals(Duration.millis(4001), fromCloudDuration("4.001000000s"));
- assertEquals(Duration.millis(4001), fromCloudDuration("4.001000001s"));
- assertNull(fromCloudDuration(""));
- assertNull(fromCloudDuration("4"));
- assertNull(fromCloudDuration("4.1"));
- assertNull(fromCloudDuration("4.1s"));
- }
-}
[3/7] incubator-beam git commit: [BEAM-151] Rebase onto apache/master
updating packages to new structure
Posted by lc...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6023d26a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/worker/IsmFormat.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/worker/IsmFormat.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/worker/IsmFormat.java
deleted file mode 100644
index 8df46dd..0000000
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/worker/IsmFormat.java
+++ /dev/null
@@ -1,811 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.runners.worker;
-
-import static org.apache.beam.sdk.util.Structs.addLong;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
-
-import org.apache.beam.sdk.coders.AtomicCoder;
-import org.apache.beam.sdk.coders.ByteArrayCoder;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.Coder.NonDeterministicException;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.ListCoder;
-import org.apache.beam.sdk.coders.StandardCoder;
-import org.apache.beam.sdk.coders.VarIntCoder;
-import org.apache.beam.sdk.coders.VarLongCoder;
-import org.apache.beam.sdk.util.CloudObject;
-import org.apache.beam.sdk.util.PropertyNames;
-import org.apache.beam.sdk.util.RandomAccessData;
-import org.apache.beam.sdk.util.VarInt;
-import org.apache.beam.sdk.values.PCollection;
-
-import com.google.auto.value.AutoValue;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
-import com.google.common.hash.HashFunction;
-import com.google.common.hash.Hashing;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.ArrayList;
-import java.util.List;
-import javax.annotation.Nullable;
-
-/**
- * An Ism file is a prefix encoded composite key value file broken into shards. Each composite
- * key is composed of a fixed number of component keys. A fixed number of those sub keys represent
- * the shard key portion; see {@link IsmRecord} and {@link IsmRecordCoder} for further details
- * around the data format. In addition to the data, there is a bloom filter,
- * and multiple indices to allow for efficient retrieval.
- *
- * <p>An Ism file is composed of these high level sections (in order):
- * <ul>
- * <li>shard block</li>
- * <li>bloom filter (See {@code ScalableBloomFilter} for details on encoding format)</li>
- * <li>shard index</li>
- * <li>footer (See {@link Footer} for details on encoding format)</li>
- * </ul>
- *
- * <p>The shard block is composed of multiple copies of the following:
- * <ul>
- * <li>data block</li>
- * <li>data index</li>
- * </ul>
- *
- * <p>The data block is composed of multiple copies of the following:
- * <ul>
- * <li>key prefix (See {@link KeyPrefix} for details on encoding format)</li>
- * <li>unshared key bytes</li>
- * <li>value bytes</li>
- * <li>optional 0x00 0x00 bytes followed by metadata bytes
- * (if the following 0x00 0x00 bytes are not present, then there are no metadata bytes)</li>
- * </ul>
- * Each key written into the data block must be in unsigned lexicographically increasing order
- * and also its shard portion of the key must hash to the same shard id as all other keys
- * within the same data block. The hashing function used is the
- * <a href="http://smhasher.googlecode.com/svn/trunk/MurmurHash3.cpp">
- * 32-bit murmur3 algorithm, x86 variant</a> (little-endian variant),
- * using {@code 1225801234} as the seed value.
- *
- * <p>The data index is composed of {@code N} copies of the following:
- * <ul>
- * <li>key prefix (See {@link KeyPrefix} for details on encoding format)</li>
- * <li>unshared key bytes</li>
- * <li>byte offset to key prefix in data block (variable length long coding)</li>
- * </ul>
- *
- * <p>The shard index is composed of a {@link VarInt variable length integer} encoding representing
- * the number of shard index records followed by that many shard index records.
- * See {@link IsmShardCoder} for further details as to its encoding scheme.
- */
-public class IsmFormat {
- private static final int HASH_SEED = 1225801234;
- private static final HashFunction HASH_FUNCTION = Hashing.murmur3_32(HASH_SEED);
- public static final int SHARD_BITS = 0x7F; // [0-127] shards + [128-255] metadata shards
-
- /**
- * A record containing a composite key and either a value or metadata. The composite key
- * must not contain the metadata key component place holder if producing a value record, and must
- * contain the metadata component key place holder if producing a metadata record.
- *
- * <p>The composite key is a fixed number of component keys where the first {@code N} component
- * keys are used to create a shard id via hashing. See {@link IsmRecordCoder#hash(List)} for
- * further details.
- */
- @AutoValue
- public abstract static class IsmRecord<V> {
- abstract List<?> keyComponents();
- @Nullable abstract V value();
- @Nullable abstract byte[] metadata();
-
- IsmRecord() {} // Prevent public constructor
-
- /** Returns an IsmRecord with the specified key components and value. */
- public static <V> IsmRecord<V> of(List<?> keyComponents, V value) {
- checkArgument(!keyComponents.isEmpty(), "Expected non-empty list of key components.");
- checkArgument(!isMetadataKey(keyComponents),
- "Expected key components to not contain metadata key.");
- return new AutoValue_IsmFormat_IsmRecord<V>(keyComponents, value, null);
- }
-
- public static <V> IsmRecord<V> meta(List<?> keyComponents, byte[] metadata) {
- checkNotNull(metadata);
- checkArgument(!keyComponents.isEmpty(), "Expected non-empty list of key components.");
- checkArgument(isMetadataKey(keyComponents),
- "Expected key components to contain metadata key.");
- return new AutoValue_IsmFormat_IsmRecord<V>(keyComponents, null, metadata);
- }
-
- /** Returns the list of key components. */
- public List<?> getKeyComponents() {
- return keyComponents();
- }
-
- /** Returns the key component at the specified index. */
- public Object getKeyComponent(int index) {
- return keyComponents().get(index);
- }
-
- /**
- * Returns the value. Throws {@link IllegalStateException} if this is not a
- * value record.
- */
- public V getValue() {
- checkState(!isMetadataKey(keyComponents()),
- "This is a metadata record and not a value record.");
- return value();
- }
-
- /**
- * Returns the metadata. Throws {@link IllegalStateException} if this is not a
- * metadata record.
- */
- public byte[] getMetadata() {
- checkState(isMetadataKey(keyComponents()),
- "This is a value record and not a metadata record.");
- return metadata();
- }
- }
-
- /** A {@link Coder} for {@link IsmRecord}s.
- *
- * <p>Note that this coder standalone will not produce an Ism file. This coder can be used
- * to materialize a {@link PCollection} of {@link IsmRecord}s. Only when this coder
- * is combined with an {@link IsmSink} will one produce an Ism file.
- *
- * <p>The {@link IsmRecord} encoded format is:
- * <ul>
- * <li>encoded key component 1 using key component coder 1</li>
- * <li>...</li>
- * <li>encoded key component N using key component coder N</li>
- * <li>encoded value using value coder</li>
- * </ul>
- */
- public static class IsmRecordCoder<V>
- extends StandardCoder<IsmRecord<V>> {
- /** Returns an IsmRecordCoder with the specified key component coders, value coder. */
- public static <V> IsmRecordCoder<V> of(
- int numberOfShardKeyCoders,
- int numberOfMetadataShardKeyCoders,
- List<Coder<?>> keyComponentCoders,
- Coder<V> valueCoder) {
- checkNotNull(keyComponentCoders);
- checkArgument(keyComponentCoders.size() > 0);
- checkArgument(numberOfShardKeyCoders > 0);
- checkArgument(numberOfShardKeyCoders <= keyComponentCoders.size());
- checkArgument(numberOfMetadataShardKeyCoders <= keyComponentCoders.size());
- return new IsmRecordCoder<>(
- numberOfShardKeyCoders,
- numberOfMetadataShardKeyCoders,
- keyComponentCoders,
- valueCoder);
- }
-
- /**
- * Returns an IsmRecordCoder with the specified coders. Note that this method is not meant
- * to be called by users but used by Jackson when decoding this coder.
- */
- @JsonCreator
- public static IsmRecordCoder<?> of(
- @JsonProperty(PropertyNames.NUM_SHARD_CODERS) int numberOfShardCoders,
- @JsonProperty(PropertyNames.NUM_METADATA_SHARD_CODERS) int numberOfMetadataShardCoders,
- @JsonProperty(PropertyNames.COMPONENT_ENCODINGS) List<Coder<?>> components) {
- Preconditions.checkArgument(components.size() >= 2,
- "Expecting at least 2 components, got " + components.size());
- return of(
- numberOfShardCoders,
- numberOfMetadataShardCoders,
- components.subList(0, components.size() - 1),
- components.get(components.size() - 1));
- }
-
- private final int numberOfShardKeyCoders;
- private final int numberOfMetadataShardKeyCoders;
- private final List<Coder<?>> keyComponentCoders;
- private final Coder<V> valueCoder;
-
- private IsmRecordCoder(
- int numberOfShardKeyCoders,
- int numberOfMetadataShardKeyCoders,
- List<Coder<?>> keyComponentCoders, Coder<V> valueCoder) {
- this.numberOfShardKeyCoders = numberOfShardKeyCoders;
- this.numberOfMetadataShardKeyCoders = numberOfMetadataShardKeyCoders;
- this.keyComponentCoders = keyComponentCoders;
- this.valueCoder = valueCoder;
- }
-
- /** Returns the list of key component coders. */
- public List<Coder<?>> getKeyComponentCoders() {
- return keyComponentCoders;
- }
-
- /** Returns the key coder at the specified index. */
- public Coder getKeyComponentCoder(int index) {
- return keyComponentCoders.get(index);
- }
-
- /** Returns the value coder. */
- public Coder<V> getValueCoder() {
- return valueCoder;
- }
-
- @Override
- public void encode(IsmRecord<V> value, OutputStream outStream,
- Coder.Context context) throws CoderException, IOException {
- if (value.getKeyComponents().size() != keyComponentCoders.size()) {
- throw new CoderException(String.format(
- "Expected %s key component(s) but received key component(s) %s.",
- keyComponentCoders.size(), value.getKeyComponents()));
- }
- for (int i = 0; i < keyComponentCoders.size(); ++i) {
- getKeyComponentCoder(i).encode(value.getKeyComponent(i), outStream, context.nested());
- }
- if (isMetadataKey(value.getKeyComponents())) {
- ByteArrayCoder.of().encode(value.getMetadata(), outStream, context.nested());
- } else {
- valueCoder.encode(value.getValue(), outStream, context.nested());
- }
- }
-
- @Override
- public IsmRecord<V> decode(InputStream inStream, Coder.Context context)
- throws CoderException, IOException {
- List<Object> keyComponents = new ArrayList<>(keyComponentCoders.size());
- for (Coder<?> keyCoder : keyComponentCoders) {
- keyComponents.add(keyCoder.decode(inStream, context.nested()));
- }
- if (isMetadataKey(keyComponents)) {
- return IsmRecord.<V>meta(
- keyComponents, ByteArrayCoder.of().decode(inStream, context.nested()));
- } else {
- return IsmRecord.<V>of(keyComponents, valueCoder.decode(inStream, context.nested()));
- }
- }
-
- public int getNumberOfShardKeyCoders(List<?> keyComponents) {
- if (isMetadataKey(keyComponents)) {
- return numberOfMetadataShardKeyCoders;
- } else {
- return numberOfShardKeyCoders;
- }
- }
-
- /**
- * Computes the shard id for the given key component(s).
- *
- * The shard keys are encoded into their byte representations and hashed using the
- * <a href="http://smhasher.googlecode.com/svn/trunk/MurmurHash3.cpp">
- * 32-bit murmur3 algorithm, x86 variant</a> (little-endian variant),
- * using {@code 1225801234} as the seed value. We ensure that shard ids for
- * metadata keys and normal keys do not overlap.
- */
- public <V, T> int hash(List<?> keyComponents) {
- return encodeAndHash(keyComponents, new RandomAccessData(), new ArrayList<Integer>());
- }
-
- /**
- * Computes the shard id for the given key component(s).
- *
- * Mutates {@code keyBytes} such that when returned, contains the encoded
- * version of the key components.
- */
- public <V, T> int encodeAndHash(List<?> keyComponents, RandomAccessData keyBytesToMutate) {
- return encodeAndHash(keyComponents, keyBytesToMutate, new ArrayList<Integer>());
- }
-
- /**
- * Computes the shard id for the given key component(s).
- *
- * Mutates {@code keyBytes} such that when returned, contains the encoded
- * version of the key components. Also, mutates {@code keyComponentByteOffsetsToMutate} to
- * store the location where each key component's encoded byte representation ends within
- * {@code keyBytes}.
- */
- public <V, T> int encodeAndHash(
- List<?> keyComponents,
- RandomAccessData keyBytesToMutate,
- List<Integer> keyComponentByteOffsetsToMutate) {
- checkNotNull(keyComponents);
- checkArgument(keyComponents.size() <= keyComponentCoders.size(),
- "Expected at most %s key component(s) but received %s.",
- keyComponentCoders.size(), keyComponents);
-
- final int numberOfKeyCodersToUse;
- final int shardOffset;
- if (isMetadataKey(keyComponents)) {
- numberOfKeyCodersToUse = numberOfMetadataShardKeyCoders;
- shardOffset = SHARD_BITS + 1;
- } else {
- numberOfKeyCodersToUse = numberOfShardKeyCoders;
- shardOffset = 0;
- }
-
- checkArgument(numberOfKeyCodersToUse <= keyComponents.size(),
- "Expected at least %s key component(s) but received %s.",
- numberOfShardKeyCoders, keyComponents);
-
- try {
- // Encode the shard portion
- for (int i = 0; i < numberOfKeyCodersToUse; ++i) {
- getKeyComponentCoder(i).encode(
- keyComponents.get(i), keyBytesToMutate.asOutputStream(), Context.NESTED);
- keyComponentByteOffsetsToMutate.add(keyBytesToMutate.size());
- }
- int rval = HASH_FUNCTION.hashBytes(
- keyBytesToMutate.array(), 0, keyBytesToMutate.size()).asInt() & SHARD_BITS;
- rval += shardOffset;
-
- // Encode the remainder
- for (int i = numberOfKeyCodersToUse; i < keyComponents.size(); ++i) {
- getKeyComponentCoder(i).encode(
- keyComponents.get(i), keyBytesToMutate.asOutputStream(), Context.NESTED);
- keyComponentByteOffsetsToMutate.add(keyBytesToMutate.size());
- }
- return rval;
- } catch (IOException e) {
- throw new IllegalStateException(
- String.format("Failed to hash %s with coder %s", keyComponents, this), e);
- }
- }
-
- @Override
- public List<Coder<?>> getCoderArguments() {
- return ImmutableList.<Coder<?>>builder()
- .addAll(keyComponentCoders)
- .add(valueCoder)
- .build();
- }
-
- @Override
- public CloudObject asCloudObject() {
- CloudObject cloudObject = super.asCloudObject();
- addLong(cloudObject, PropertyNames.NUM_SHARD_CODERS, numberOfShardKeyCoders);
- addLong(cloudObject, PropertyNames.NUM_METADATA_SHARD_CODERS, numberOfMetadataShardKeyCoders);
- return cloudObject;
- }
-
- @Override
- public void verifyDeterministic() throws Coder.NonDeterministicException {
- verifyDeterministic("Key component coders expected to be deterministic.", keyComponentCoders);
- verifyDeterministic("Value coder expected to be deterministic.", valueCoder);
- }
-
- @Override
- public boolean consistentWithEquals() {
- for (Coder<?> keyComponentCoder : keyComponentCoders) {
- if (!keyComponentCoder.consistentWithEquals()) {
- return false;
- }
- }
- return valueCoder.consistentWithEquals();
- }
-
- @Override
- public Object structuralValue(IsmRecord<V> record) throws Exception {
- checkState(record.getKeyComponents().size() == keyComponentCoders.size(),
- "Expected the number of key component coders %s "
- + "to match the number of key components %s.",
- keyComponentCoders.size(), record.getKeyComponents());
-
- if (record != null && consistentWithEquals()) {
- ArrayList<Object> keyComponentStructuralValues = new ArrayList<>();
- for (int i = 0; i < keyComponentCoders.size(); ++i) {
- keyComponentStructuralValues.add(
- getKeyComponentCoder(i).structuralValue(record.getKeyComponent(i)));
- }
- if (isMetadataKey(record.getKeyComponents())) {
- return IsmRecord.meta(keyComponentStructuralValues, record.getMetadata());
- } else {
- return IsmRecord.of(keyComponentStructuralValues,
- valueCoder.structuralValue(record.getValue()));
- }
- }
- return super.structuralValue(record);
- }
- }
-
- /**
- * Validates that the key portion of the given coder is deterministic.
- */
- public static void validateCoderIsCompatible(IsmRecordCoder<?> coder) {
- for (Coder<?> keyComponentCoder : coder.getKeyComponentCoders()) {
- try {
- keyComponentCoder.verifyDeterministic();
- } catch (NonDeterministicException e) {
- throw new IllegalArgumentException(
- String.format("Key component coder %s is expected to be deterministic.",
- keyComponentCoder), e);
- }
- }
- }
-
- /** Returns true if and only if any of the passed in key components represent a metadata key. */
- public static boolean isMetadataKey(List<?> keyComponents) {
- for (Object keyComponent : keyComponents) {
- if (keyComponent == METADATA_KEY) {
- return true;
- }
- }
- return false;
- }
-
- /** A marker object representing the wildcard metadata key component. */
- private static final Object METADATA_KEY = new Object() {
- @Override
- public String toString() {
- return "META";
- }
-
- @Override
- public boolean equals(Object obj) {
- return this == obj;
- }
-
- @Override
- public int hashCode() {
- return -1248902349;
- }
- };
-
- /**
- * An object representing a wild card for a key component.
- * Encoded using {@link MetadataKeyCoder}.
- */
- public static Object getMetadataKey() {
- return METADATA_KEY;
- }
-
- /**
- * A coder for metadata key component. Can be used to wrap key component coder allowing for
- * the metadata key component to be used as a place holder instead of an actual key.
- */
- public static class MetadataKeyCoder<K> extends StandardCoder<K> {
- public static <K> MetadataKeyCoder<K> of(Coder<K> keyCoder) {
- checkNotNull(keyCoder);
- return new MetadataKeyCoder<>(keyCoder);
- }
-
- /**
- * Returns an IsmRecordCoder with the specified coders. Note that this method is not meant
- * to be called by users but used by Jackson when decoding this coder.
- */
- @JsonCreator
- public static MetadataKeyCoder<?> of(
- @JsonProperty(PropertyNames.COMPONENT_ENCODINGS) List<Coder<?>> components) {
- Preconditions.checkArgument(components.size() == 1,
- "Expecting one component, got " + components.size());
- return of(components.get(0));
- }
-
- private final Coder<K> keyCoder;
-
- private MetadataKeyCoder(Coder<K> keyCoder) {
- this.keyCoder = keyCoder;
- }
-
- public Coder<K> getKeyCoder() {
- return keyCoder;
- }
-
- @Override
- public void encode(K value, OutputStream outStream, Coder.Context context)
- throws CoderException, IOException {
- if (value == METADATA_KEY) {
- outStream.write(0);
- } else {
- outStream.write(1);
- keyCoder.encode(value, outStream, context.nested());
- }
- }
-
- @Override
- public K decode(InputStream inStream, Coder.Context context)
- throws CoderException, IOException {
- int marker = inStream.read();
- if (marker == 0) {
- return (K) getMetadataKey();
- } else if (marker == 1) {
- return keyCoder.decode(inStream, context.nested());
- } else {
- throw new CoderException(String.format("Expected marker but got %s.", marker));
- }
- }
-
- @Override
- public List<Coder<?>> getCoderArguments() {
- return ImmutableList.<Coder<?>>of(keyCoder);
- }
-
- @Override
- public void verifyDeterministic() throws NonDeterministicException {
- verifyDeterministic("Expected key coder to be deterministic", keyCoder);
- }
- }
-
- /**
- * A shard descriptor containing shard id, the data block offset, and the index offset for the
- * given shard.
- */
- @AutoValue
- public abstract static class IsmShard {
- abstract int id();
- abstract long blockOffset();
- abstract long indexOffset();
-
- IsmShard() {}
-
- /** Returns an IsmShard with the given id, block offset and no index offset. */
- public static IsmShard of(int id, long blockOffset) {
- IsmShard ismShard = new AutoValue_IsmFormat_IsmShard(id, blockOffset, -1);
- checkState(id >= 0,
- "%s attempting to be written with negative shard id.",
- ismShard);
- checkState(blockOffset >= 0,
- "%s attempting to be written with negative block offset.",
- ismShard);
- return ismShard;
- }
-
- /** Returns an IsmShard with the given id, block offset, and index offset. */
- public static IsmShard of(int id, long blockOffset, long indexOffset) {
- IsmShard ismShard = new AutoValue_IsmFormat_IsmShard(id, blockOffset, indexOffset);
- checkState(id >= 0,
- "%s attempting to be written with negative shard id.",
- ismShard);
- checkState(blockOffset >= 0,
- "%s attempting to be written with negative block offset.",
- ismShard);
- checkState(indexOffset >= 0,
- "%s attempting to be written with negative index offset.",
- ismShard);
- return ismShard;
- }
-
- /** Return the shard id. */
- public int getId() {
- return id();
- }
-
- /** Return the absolute position within the Ism file where the data block begins. */
- public long getBlockOffset() {
- return blockOffset();
- }
-
- /**
- * Return the absolute position within the Ism file where the index block begins.
- * Throws {@link IllegalStateException} if the index offset was never specified.
- */
- public long getIndexOffset() {
- checkState(indexOffset() >= 0,
- "Unable to fetch index offset because it was never specified.");
- return indexOffset();
- }
-
- /** Returns a new IsmShard like this one with the specified index offset. */
- public IsmShard withIndexOffset(long indexOffset) {
- return of(id(), blockOffset(), indexOffset);
- }
- }
-
- /**
- * A {@link ListCoder} wrapping a {@link IsmShardCoder} used to encode the shard index.
- * See {@link ListCoder} for its encoding specification and {@link IsmShardCoder} for its
- * encoding specification.
- */
- public static final Coder<List<IsmShard>> ISM_SHARD_INDEX_CODER =
- ListCoder.of(IsmShardCoder.of());
-
- /**
- * A coder for {@link IsmShard}s.
- *
- * The shard descriptor is encoded as:
- * <ul>
- * <li>id (variable length integer encoding)</li>
- * <li>blockOffset (variable length long encoding)</li>
- * <li>indexOffset (variable length long encoding)</li>
- * </ul>
- */
- public static class IsmShardCoder extends AtomicCoder<IsmShard> {
- private static final IsmShardCoder INSTANCE = new IsmShardCoder();
-
- /** Returns an IsmShardCoder. */
- @JsonCreator
- public static IsmShardCoder of() {
- return INSTANCE;
- }
-
- private IsmShardCoder() {
- }
-
- @Override
- public void encode(IsmShard value, OutputStream outStream, Coder.Context context)
- throws CoderException, IOException {
- checkState(value.getIndexOffset() >= 0,
- "%s attempting to be written without index offset.",
- value);
- VarIntCoder.of().encode(value.getId(), outStream, context.nested());
- VarLongCoder.of().encode(value.getBlockOffset(), outStream, context.nested());
- VarLongCoder.of().encode(value.getIndexOffset(), outStream, context.nested());
- }
-
- @Override
- public IsmShard decode(
- InputStream inStream, Coder.Context context) throws CoderException, IOException {
- return IsmShard.of(
- VarIntCoder.of().decode(inStream, context),
- VarLongCoder.of().decode(inStream, context),
- VarLongCoder.of().decode(inStream, context));
- }
-
- @Override
- public boolean consistentWithEquals() {
- return true;
- }
- }
-
- /**
- * The prefix used before each key which contains the number of shared and unshared
- * bytes from the previous key that was read. The key prefix along with the previous key
- * and the unshared key bytes allows one to construct the current key by doing the following
- * {@code currentKey = previousKey[0 : sharedBytes] + read(unsharedBytes)}.
- *
- * <p>The key prefix is encoded as:
- * <ul>
- * <li>number of shared key bytes (variable length integer coding)</li>
- * <li>number of unshared key bytes (variable length integer coding)</li>
- * </ul>
- */
- @AutoValue
- public abstract static class KeyPrefix {
- public abstract int getSharedKeySize();
- public abstract int getUnsharedKeySize();
-
- public static KeyPrefix of(int sharedKeySize, int unsharedKeySize) {
- return new AutoValue_IsmFormat_KeyPrefix(sharedKeySize, unsharedKeySize);
- }
- }
-
- /** A {@link Coder} for {@link KeyPrefix}. */
- public static final class KeyPrefixCoder extends AtomicCoder<KeyPrefix> {
- private static final KeyPrefixCoder INSTANCE = new KeyPrefixCoder();
-
- @JsonCreator
- public static KeyPrefixCoder of() {
- return INSTANCE;
- }
-
- @Override
- public void encode(KeyPrefix value, OutputStream outStream, Coder.Context context)
- throws CoderException, IOException {
- VarInt.encode(value.getSharedKeySize(), outStream);
- VarInt.encode(value.getUnsharedKeySize(), outStream);
- }
-
- @Override
- public KeyPrefix decode(InputStream inStream, Coder.Context context)
- throws CoderException, IOException {
- return KeyPrefix.of(VarInt.decodeInt(inStream), VarInt.decodeInt(inStream));
- }
-
- @Override
- public boolean consistentWithEquals() {
- return true;
- }
-
- @Override
- public boolean isRegisterByteSizeObserverCheap(KeyPrefix value, Coder.Context context) {
- return true;
- }
-
- @Override
- public long getEncodedElementByteSize(KeyPrefix value, Coder.Context context)
- throws Exception {
- Preconditions.checkNotNull(value);
- return VarInt.getLength(value.getSharedKeySize())
- + VarInt.getLength(value.getUnsharedKeySize());
- }
- }
-
- /**
- * The footer stores the relevant information required to locate the index and bloom filter.
- * It also stores a version byte and the number of keys stored.
- *
- * <p>The footer is encoded as the value containing:
- * <ul>
- * <li>start of bloom filter offset (big endian long coding)</li>
- * <li>start of shard index position offset (big endian long coding)</li>
- * <li>number of keys in file (big endian long coding)</li>
- * <li>0x01 (version key as a single byte)</li>
- * </ul>
- */
- @AutoValue
- public abstract static class Footer {
- public static final int LONG_BYTES = 8;
- public static final int FIXED_LENGTH = 3 * LONG_BYTES + 1;
- public static final byte VERSION = 2;
-
- public abstract byte getVersion();
- public abstract long getIndexPosition();
- public abstract long getBloomFilterPosition();
- public abstract long getNumberOfKeys();
-
- public static Footer of(long indexPosition, long bloomFilterPosition, long numberOfKeys) {
- return new AutoValue_IsmFormat_Footer(
- VERSION, indexPosition, bloomFilterPosition, numberOfKeys);
- }
- }
-
- /** A {@link Coder} for {@link Footer}. */
- public static final class FooterCoder extends AtomicCoder<Footer> {
- private static final FooterCoder INSTANCE = new FooterCoder();
-
- @JsonCreator
- public static FooterCoder of() {
- return INSTANCE;
- }
-
- @Override
- public void encode(Footer value, OutputStream outStream, Coder.Context context)
- throws CoderException, IOException {
- DataOutputStream dataOut = new DataOutputStream(outStream);
- dataOut.writeLong(value.getIndexPosition());
- dataOut.writeLong(value.getBloomFilterPosition());
- dataOut.writeLong(value.getNumberOfKeys());
- dataOut.write(Footer.VERSION);
- }
-
- @Override
- public Footer decode(InputStream inStream, Coder.Context context)
- throws CoderException, IOException {
- DataInputStream dataIn = new DataInputStream(inStream);
- Footer footer = Footer.of(dataIn.readLong(), dataIn.readLong(), dataIn.readLong());
- int version = dataIn.read();
- if (version != Footer.VERSION) {
- throw new IOException("Unknown version " + version + ". "
- + "Only version 2 is currently supported.");
- }
- return footer;
- }
-
- @Override
- public boolean consistentWithEquals() {
- return true;
- }
-
- @Override
- public boolean isRegisterByteSizeObserverCheap(Footer value, Coder.Context context) {
- return true;
- }
-
- @Override
- public long getEncodedElementByteSize(Footer value, Coder.Context context)
- throws Exception {
- return Footer.FIXED_LENGTH;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6023d26a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/worker/package-info.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/worker/package-info.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/worker/package-info.java
deleted file mode 100644
index 6133148..0000000
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/worker/package-info.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * Implementation of the harness that runs on each Google Compute Engine instance to coordinate
- * execution of Pipeline code.
- */
-@ParametersAreNonnullByDefault
-package org.apache.beam.sdk.runners.worker;
-
-import javax.annotation.ParametersAreNonnullByDefault;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6023d26a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/util/DoFnInfo.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/util/DoFnInfo.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/util/DoFnInfo.java
deleted file mode 100644
index ae19a17..0000000
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/util/DoFnInfo.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.values.PCollectionView;
-
-import java.io.Serializable;
-
-/**
- * Wrapper class holding the necessary information to serialize a DoFn.
- *
- * @param <InputT> the type of the (main) input elements of the DoFn
- * @param <OutputT> the type of the (main) output elements of the DoFn
- */
-public class DoFnInfo<InputT, OutputT> implements Serializable {
- private final DoFn<InputT, OutputT> doFn;
- private final WindowingStrategy<?, ?> windowingStrategy;
- private final Iterable<PCollectionView<?>> sideInputViews;
- private final Coder<InputT> inputCoder;
-
- public DoFnInfo(DoFn<InputT, OutputT> doFn, WindowingStrategy<?, ?> windowingStrategy) {
- this.doFn = doFn;
- this.windowingStrategy = windowingStrategy;
- this.sideInputViews = null;
- this.inputCoder = null;
- }
-
- public DoFnInfo(DoFn<InputT, OutputT> doFn, WindowingStrategy<?, ?> windowingStrategy,
- Iterable<PCollectionView<?>> sideInputViews, Coder<InputT> inputCoder) {
- this.doFn = doFn;
- this.windowingStrategy = windowingStrategy;
- this.sideInputViews = sideInputViews;
- this.inputCoder = inputCoder;
- }
-
- public DoFn<InputT, OutputT> getDoFn() {
- return doFn;
- }
-
- public WindowingStrategy<?, ?> getWindowingStrategy() {
- return windowingStrategy;
- }
-
- public Iterable<PCollectionView<?>> getSideInputViews() {
- return sideInputViews;
- }
-
- public Coder<InputT> getInputCoder() {
- return inputCoder;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6023d26a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/util/OutputReference.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/util/OutputReference.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/util/OutputReference.java
deleted file mode 100644
index 5e30172..0000000
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/util/OutputReference.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import static com.google.api.client.util.Preconditions.checkNotNull;
-
-import com.google.api.client.json.GenericJson;
-import com.google.api.client.util.Key;
-
-/**
- * A representation used by {@link com.google.api.services.dataflow.model.Step}s
- * to reference the output of other {@code Step}s.
- */
-public final class OutputReference extends GenericJson {
- @Key("@type")
- public final String type = "OutputReference";
-
- @Key("step_name")
- private final String stepName;
-
- @Key("output_name")
- private final String outputName;
-
- public OutputReference(String stepName, String outputName) {
- this.stepName = checkNotNull(stepName);
- this.outputName = checkNotNull(outputName);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6023d26a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/util/RandomAccessData.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/util/RandomAccessData.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/util/RandomAccessData.java
deleted file mode 100644
index 2d902f4..0000000
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/util/RandomAccessData.java
+++ /dev/null
@@ -1,355 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import org.apache.beam.sdk.coders.AtomicCoder;
-import org.apache.beam.sdk.coders.ByteArrayCoder;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-
-import com.google.common.base.MoreObjects;
-import com.google.common.io.ByteStreams;
-import com.google.common.primitives.UnsignedBytes;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.Arrays;
-import java.util.Comparator;
-
-import javax.annotation.concurrent.NotThreadSafe;
-
-/**
- * An elastic-sized byte array which allows you to manipulate it as a stream, or access
- * it directly. This allows for a quick succession of moving bytes from an {@link InputStream}
- * to this wrapper to be used as an {@link OutputStream} and vice versa. This wrapper
- * also provides random access to bytes stored within. This wrapper allows users to finely
- * control the number of byte copies that occur.
- *
- * Anything stored within the in-memory buffer from offset {@link #size()} is considered temporary
- * unused storage.
- */
-@NotThreadSafe
-public class RandomAccessData {
- /**
- * A {@link Coder} which encodes the valid parts of this stream.
- * This follows the same encoding scheme as {@link ByteArrayCoder}.
- * This coder is deterministic and consistent with equals.
- *
- * This coder does not support encoding positive infinity.
- */
- public static class RandomAccessDataCoder extends AtomicCoder<RandomAccessData> {
- private static final RandomAccessDataCoder INSTANCE = new RandomAccessDataCoder();
-
- @JsonCreator
- public static RandomAccessDataCoder of() {
- return INSTANCE;
- }
-
- @Override
- public void encode(RandomAccessData value, OutputStream outStream, Coder.Context context)
- throws CoderException, IOException {
- if (value == POSITIVE_INFINITY) {
- throw new CoderException("Positive infinity can not be encoded.");
- }
- if (!context.isWholeStream) {
- VarInt.encode(value.size, outStream);
- }
- value.writeTo(outStream, 0, value.size);
- }
-
- @Override
- public RandomAccessData decode(InputStream inStream, Coder.Context context)
- throws CoderException, IOException {
- RandomAccessData rval = new RandomAccessData();
- if (!context.isWholeStream) {
- int length = VarInt.decodeInt(inStream);
- rval.readFrom(inStream, 0, length);
- } else {
- ByteStreams.copy(inStream, rval.asOutputStream());
- }
- return rval;
- }
-
- @Override
- public boolean consistentWithEquals() {
- return true;
- }
-
- @Override
- public boolean isRegisterByteSizeObserverCheap(
- RandomAccessData value, Coder.Context context) {
- return true;
- }
-
- @Override
- protected long getEncodedElementByteSize(RandomAccessData value, Coder.Context context)
- throws Exception {
- if (value == null) {
- throw new CoderException("cannot encode a null in memory stream");
- }
- long size = 0;
- if (!context.isWholeStream) {
- size += VarInt.getLength(value.size);
- }
- return size + value.size;
- }
- }
-
- public static final UnsignedLexicographicalComparator UNSIGNED_LEXICOGRAPHICAL_COMPARATOR =
- new UnsignedLexicographicalComparator();
-
- /**
- * A {@link Comparator} that compares two byte arrays lexicographically. It compares
- * values as a list of unsigned bytes. The first pair of values that follow any common prefix,
- * or when one array is a prefix of the other, treats the shorter array as the lesser.
- * For example, [] < [0x01] < [0x01, 0x7F] < [0x01, 0x80] < [0x02] < POSITIVE INFINITY.
- *
- * <p>Note that a token type of positive infinity is supported and is greater than
- * all other {@link RandomAccessData}.
- */
- public static final class UnsignedLexicographicalComparator
- implements Comparator<RandomAccessData> {
- // Do not instantiate
- private UnsignedLexicographicalComparator() {
- }
-
- @Override
- public int compare(RandomAccessData o1, RandomAccessData o2) {
- return compare(o1, o2, 0 /* start from the beginning */);
- }
-
- /**
- * Compare the two sets of bytes starting at the given offset.
- */
- public int compare(RandomAccessData o1, RandomAccessData o2, int startOffset) {
- if (o1 == o2) {
- return 0;
- }
- if (o1 == POSITIVE_INFINITY) {
- return 1;
- }
- if (o2 == POSITIVE_INFINITY) {
- return -1;
- }
-
- int minBytesLen = Math.min(o1.size, o2.size);
- for (int i = startOffset; i < minBytesLen; i++) {
- // unsigned comparison
- int b1 = o1.buffer[i] & 0xFF;
- int b2 = o2.buffer[i] & 0xFF;
- if (b1 == b2) {
- continue;
- }
- // Return the stream with the smaller byte as the smaller value.
- return b1 - b2;
- }
- // If one is a prefix of the other, return the shorter one as the smaller one.
- // If both lengths are equal, then both streams are equal.
- return o1.size - o2.size;
- }
-
- /**
- * Compute the length of the common prefix of the two provided sets of bytes.
- */
- public int commonPrefixLength(RandomAccessData o1, RandomAccessData o2) {
- int minBytesLen = Math.min(o1.size, o2.size);
- for (int i = 0; i < minBytesLen; i++) {
- // unsigned comparison
- int b1 = o1.buffer[i] & 0xFF;
- int b2 = o2.buffer[i] & 0xFF;
- if (b1 != b2) {
- return i;
- }
- }
- return minBytesLen;
- }
- }
-
- /** A token type representing positive infinity. */
- static final RandomAccessData POSITIVE_INFINITY = new RandomAccessData(0);
-
- /**
- * Returns a RandomAccessData that is the smallest value of same length which
- * is strictly greater than this. Note that if this is empty or is all 0xFF then
- * a token value of positive infinity is returned.
- *
- * The {@link UnsignedLexicographicalComparator} supports comparing {@link RandomAccessData}
- * with support for positive infinitiy.
- */
- public RandomAccessData increment() throws IOException {
- RandomAccessData copy = copy();
- for (int i = copy.size - 1; i >= 0; --i) {
- if (copy.buffer[i] != UnsignedBytes.MAX_VALUE) {
- copy.buffer[i] = UnsignedBytes.checkedCast(UnsignedBytes.toInt(copy.buffer[i]) + 1);
- return copy;
- }
- }
- return POSITIVE_INFINITY;
- }
-
- private static final int DEFAULT_INITIAL_BUFFER_SIZE = 128;
-
- /** Constructs a RandomAccessData with a default buffer size. */
- public RandomAccessData() {
- this(DEFAULT_INITIAL_BUFFER_SIZE);
- }
-
- /** Constructs a RandomAccessData with the initial buffer. */
- public RandomAccessData(byte[] initialBuffer) {
- checkNotNull(initialBuffer);
- this.buffer = initialBuffer;
- this.size = initialBuffer.length;
- }
-
- /** Constructs a RandomAccessData with the given buffer size. */
- public RandomAccessData(int initialBufferSize) {
- checkArgument(initialBufferSize >= 0, "Expected initial buffer size to be greater than zero.");
- this.buffer = new byte[initialBufferSize];
- }
-
- private byte[] buffer;
- private int size;
-
- /** Returns the backing array. */
- public byte[] array() {
- return buffer;
- }
-
- /** Returns the number of bytes in the backing array that are valid. */
- public int size() {
- return size;
- }
-
- /** Resets the end of the stream to the specified position. */
- public void resetTo(int position) {
- ensureCapacity(position);
- size = position;
- }
-
- private final OutputStream outputStream = new OutputStream() {
- @Override
- public void write(int b) throws IOException {
- ensureCapacity(size + 1);
- buffer[size] = (byte) b;
- size += 1;
- }
-
- @Override
- public void write(byte[] b, int offset, int length) throws IOException {
- ensureCapacity(size + length);
- System.arraycopy(b, offset, buffer, size, length);
- size += length;
- }
- };
-
- /**
- * Returns an output stream which writes to the backing buffer from the current position.
- * Note that the internal buffer will grow as required to accomodate all data written.
- */
- public OutputStream asOutputStream() {
- return outputStream;
- }
-
- /**
- * Returns an {@link InputStream} wrapper which supplies the portion of this backing byte buffer
- * starting at {@code offset} and up to {@code length} bytes. Note that the returned
- * {@link InputStream} is only a wrapper and any modifications to the underlying
- * {@link RandomAccessData} will be visible by the {@link InputStream}.
- */
- public InputStream asInputStream(final int offset, final int length) {
- return new ByteArrayInputStream(buffer, offset, length);
- }
-
- /**
- * Writes {@code length} bytes starting at {@code offset} from the backing data store to the
- * specified output stream.
- */
- public void writeTo(OutputStream out, int offset, int length) throws IOException {
- out.write(buffer, offset, length);
- }
-
- /**
- * Reads {@code length} bytes from the specified input stream writing them into the backing
- * data store starting at {@code offset}.
- *
- * <p>Note that the in memory stream will be grown to ensure there is enough capacity.
- */
- public void readFrom(InputStream inStream, int offset, int length) throws IOException {
- ensureCapacity(offset + length);
- ByteStreams.readFully(inStream, buffer, offset, length);
- size = offset + length;
- }
-
- /** Returns a copy of this RandomAccessData. */
- public RandomAccessData copy() throws IOException {
- RandomAccessData copy = new RandomAccessData(size);
- writeTo(copy.asOutputStream(), 0, size);
- return copy;
- }
-
- @Override
- public boolean equals(Object other) {
- if (other == this) {
- return true;
- }
- if (!(other instanceof RandomAccessData)) {
- return false;
- }
- return UNSIGNED_LEXICOGRAPHICAL_COMPARATOR.compare(this, (RandomAccessData) other) == 0;
- }
-
- @Override
- public int hashCode() {
- int result = 1;
- for (int i = 0; i < size; ++i) {
- result = 31 * result + buffer[i];
- }
-
- return result;
- }
-
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(this)
- .add("buffer", Arrays.copyOf(buffer, size))
- .add("size", size)
- .toString();
- }
-
- private void ensureCapacity(int minCapacity) {
- // If we have enough space, don't grow the buffer.
- if (minCapacity <= buffer.length) {
- return;
- }
-
- // Try to double the size of the buffer, if thats not enough, just use the new capacity.
- // Note that we use Math.min(long, long) to not cause overflow on the multiplication.
- int newCapacity = (int) Math.min(Integer.MAX_VALUE, buffer.length * 2L);
- if (newCapacity < minCapacity) {
- newCapacity = minCapacity;
- }
- buffer = Arrays.copyOf(buffer, newCapacity);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6023d26a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/util/TimeUtil.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/util/TimeUtil.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/util/TimeUtil.java
deleted file mode 100644
index db5c760..0000000
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/util/TimeUtil.java
+++ /dev/null
@@ -1,166 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import org.joda.time.DateTime;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-import org.joda.time.ReadableDuration;
-import org.joda.time.ReadableInstant;
-import org.joda.time.chrono.ISOChronology;
-
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import javax.annotation.Nullable;
-
-/**
- * A helper class for converting between Dataflow API and SDK time
- * representations.
- *
- * <p>Dataflow API times are strings of the form
- * {@code YYYY-MM-dd'T'HH:mm:ss[.nnnn]'Z'}: that is, RFC 3339
- * strings with optional fractional seconds and a 'Z' offset.
- *
- * <p>Dataflow API durations are strings of the form {@code ['-']sssss[.nnnn]'s'}:
- * that is, seconds with optional fractional seconds and a literal 's' at the end.
- *
- * <p>In both formats, fractional seconds are either three digits (millisecond
- * resolution), six digits (microsecond resolution), or nine digits (nanosecond
- * resolution).
- */
-public final class TimeUtil {
- private TimeUtil() {} // Non-instantiable.
-
- private static final Pattern DURATION_PATTERN = Pattern.compile("(\\d+)(?:\\.(\\d+))?s");
- private static final Pattern TIME_PATTERN =
- Pattern.compile("(\\d{4})-(\\d{2})-(\\d{2})T(\\d{2}):(\\d{2}):(\\d{2})(?:\\.(\\d+))?Z");
-
- /**
- * Converts a {@link ReadableInstant} into a Dateflow API time value.
- */
- public static String toCloudTime(ReadableInstant instant) {
- // Note that since Joda objects use millisecond resolution, we always
- // produce either no fractional seconds or fractional seconds with
- // millisecond resolution.
-
- // Translate the ReadableInstant to a DateTime with ISOChronology.
- DateTime time = new DateTime(instant);
-
- int millis = time.getMillisOfSecond();
- if (millis == 0) {
- return String.format("%04d-%02d-%02dT%02d:%02d:%02dZ",
- time.getYear(),
- time.getMonthOfYear(),
- time.getDayOfMonth(),
- time.getHourOfDay(),
- time.getMinuteOfHour(),
- time.getSecondOfMinute());
- } else {
- return String.format("%04d-%02d-%02dT%02d:%02d:%02d.%03dZ",
- time.getYear(),
- time.getMonthOfYear(),
- time.getDayOfMonth(),
- time.getHourOfDay(),
- time.getMinuteOfHour(),
- time.getSecondOfMinute(),
- millis);
- }
- }
-
- /**
- * Converts a time value received via the Dataflow API into the corresponding
- * {@link Instant}.
- * @return the parsed time, or null if a parse error occurs
- */
- @Nullable
- public static Instant fromCloudTime(String time) {
- Matcher matcher = TIME_PATTERN.matcher(time);
- if (!matcher.matches()) {
- return null;
- }
- int year = Integer.valueOf(matcher.group(1));
- int month = Integer.valueOf(matcher.group(2));
- int day = Integer.valueOf(matcher.group(3));
- int hour = Integer.valueOf(matcher.group(4));
- int minute = Integer.valueOf(matcher.group(5));
- int second = Integer.valueOf(matcher.group(6));
- int millis = 0;
-
- String frac = matcher.group(7);
- if (frac != null) {
- int fracs = Integer.valueOf(frac);
- if (frac.length() == 3) { // millisecond resolution
- millis = fracs;
- } else if (frac.length() == 6) { // microsecond resolution
- millis = fracs / 1000;
- } else if (frac.length() == 9) { // nanosecond resolution
- millis = fracs / 1000000;
- } else {
- return null;
- }
- }
-
- return new DateTime(year, month, day, hour, minute, second, millis,
- ISOChronology.getInstanceUTC()).toInstant();
- }
-
- /**
- * Converts a {@link ReadableDuration} into a Dataflow API duration string.
- */
- public static String toCloudDuration(ReadableDuration duration) {
- // Note that since Joda objects use millisecond resolution, we always
- // produce either no fractional seconds or fractional seconds with
- // millisecond resolution.
- long millis = duration.getMillis();
- long seconds = millis / 1000;
- millis = millis % 1000;
- if (millis == 0) {
- return String.format("%ds", seconds);
- } else {
- return String.format("%d.%03ds", seconds, millis);
- }
- }
-
- /**
- * Converts a Dataflow API duration string into a {@link Duration}.
- * @return the parsed duration, or null if a parse error occurs
- */
- @Nullable
- public static Duration fromCloudDuration(String duration) {
- Matcher matcher = DURATION_PATTERN.matcher(duration);
- if (!matcher.matches()) {
- return null;
- }
- long millis = Long.valueOf(matcher.group(1)) * 1000;
- String frac = matcher.group(2);
- if (frac != null) {
- long fracs = Long.valueOf(frac);
- if (frac.length() == 3) { // millisecond resolution
- millis += fracs;
- } else if (frac.length() == 6) { // microsecond resolution
- millis += fracs / 1000;
- } else if (frac.length() == 9) { // nanosecond resolution
- millis += fracs / 1000000;
- } else {
- return null;
- }
- }
- return Duration.millis(millis);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6023d26a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineRunnerTest.java
index 79e281e..d4d4b3b 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineRunnerTest.java
@@ -42,6 +42,10 @@ import org.apache.beam.runners.dataflow.DataflowPipelineRunner.BatchViewAsList;
import org.apache.beam.runners.dataflow.DataflowPipelineRunner.BatchViewAsMap;
import org.apache.beam.runners.dataflow.DataflowPipelineRunner.BatchViewAsMultimap;
import org.apache.beam.runners.dataflow.DataflowPipelineRunner.TransformedMap;
+import org.apache.beam.runners.dataflow.internal.IsmFormat;
+import org.apache.beam.runners.dataflow.internal.IsmFormat.IsmRecord;
+import org.apache.beam.runners.dataflow.internal.IsmFormat.IsmRecordCoder;
+import org.apache.beam.runners.dataflow.internal.IsmFormat.MetadataKeyCoder;
import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
@@ -60,10 +64,6 @@ import org.apache.beam.sdk.options.PipelineOptions.CheckEnabled;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.runners.TransformTreeNode;
import org.apache.beam.sdk.runners.dataflow.TestCountingSource;
-import org.apache.beam.sdk.runners.worker.IsmFormat;
-import org.apache.beam.sdk.runners.worker.IsmFormat.IsmRecord;
-import org.apache.beam.sdk.runners.worker.IsmFormat.IsmRecordCoder;
-import org.apache.beam.sdk.runners.worker.IsmFormat.MetadataKeyCoder;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFnTester;
import org.apache.beam.sdk.transforms.PTransform;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6023d26a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
index 58b38d2..8e7ed96 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
@@ -38,6 +38,7 @@ import static org.mockito.Mockito.when;
import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.TranslationContext;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions;
+import org.apache.beam.runners.dataflow.util.OutputReference;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
@@ -55,7 +56,6 @@ import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.util.GcsUtil;
-import org.apache.beam.sdk.util.OutputReference;
import org.apache.beam.sdk.util.PropertyNames;
import org.apache.beam.sdk.util.Structs;
import org.apache.beam.sdk.util.TestCredential;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6023d26a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineRunnerTest.java
index d6de501..a45284c 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineRunnerTest.java
@@ -34,6 +34,7 @@ import org.apache.beam.runners.dataflow.DataflowPipelineJob;
import org.apache.beam.runners.dataflow.DataflowPipelineRunner;
import org.apache.beam.runners.dataflow.util.MonitoringUtil;
import org.apache.beam.runners.dataflow.util.MonitoringUtil.JobMessagesHandler;
+import org.apache.beam.runners.dataflow.util.TimeUtil;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult.State;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -43,7 +44,6 @@ import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.util.GcsUtil;
import org.apache.beam.sdk.util.NoopPathValidator;
import org.apache.beam.sdk.util.TestCredential;
-import org.apache.beam.sdk.util.TimeUtil;
import org.apache.beam.sdk.util.Transport;
import org.apache.beam.sdk.values.PCollection;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6023d26a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/MonitoringUtilTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/MonitoringUtilTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/MonitoringUtilTest.java
index ee1532d..4b0ab2f 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/MonitoringUtilTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/MonitoringUtilTest.java
@@ -26,7 +26,6 @@ import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.PipelineResult.State;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.util.TestCredential;
-import org.apache.beam.sdk.util.TimeUtil;
import com.google.api.services.dataflow.Dataflow;
import com.google.api.services.dataflow.model.JobMessage;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6023d26a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/RandomAccessDataTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/RandomAccessDataTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/RandomAccessDataTest.java
new file mode 100644
index 0000000..c5c77cf
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/RandomAccessDataTest.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.util;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.beam.runners.dataflow.util.RandomAccessData.RandomAccessDataCoder;
+import org.apache.beam.sdk.coders.Coder.Context;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.testing.CoderProperties;
+
+import com.google.common.primitives.UnsignedBytes;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.InputStream;
+import java.util.Arrays;
+
+/**
+ * Tests for {@link RandomAccessData}.
+ */
+@RunWith(JUnit4.class)
+public class RandomAccessDataTest {
+ private static final byte[] TEST_DATA_A = new byte[]{ 0x01, 0x02, 0x03 };
+ private static final byte[] TEST_DATA_B = new byte[]{ 0x06, 0x05, 0x04, 0x03 };
+ private static final byte[] TEST_DATA_C = new byte[]{ 0x06, 0x05, 0x03, 0x03 };
+
+ @Rule public ExpectedException expectedException = ExpectedException.none();
+
+ @Test
+ public void testCoder() throws Exception {
+ RandomAccessData streamA = new RandomAccessData();
+ streamA.asOutputStream().write(TEST_DATA_A);
+ RandomAccessData streamB = new RandomAccessData();
+ streamB.asOutputStream().write(TEST_DATA_A);
+ CoderProperties.coderDecodeEncodeEqual(RandomAccessDataCoder.of(), streamA);
+ CoderProperties.coderDeterministic(RandomAccessDataCoder.of(), streamA, streamB);
+ CoderProperties.coderConsistentWithEquals(RandomAccessDataCoder.of(), streamA, streamB);
+ CoderProperties.coderSerializable(RandomAccessDataCoder.of());
+ CoderProperties.structuralValueConsistentWithEquals(
+ RandomAccessDataCoder.of(), streamA, streamB);
+ assertTrue(RandomAccessDataCoder.of().isRegisterByteSizeObserverCheap(streamA, Context.NESTED));
+ assertTrue(RandomAccessDataCoder.of().isRegisterByteSizeObserverCheap(streamA, Context.OUTER));
+ assertEquals(4, RandomAccessDataCoder.of().getEncodedElementByteSize(streamA, Context.NESTED));
+ assertEquals(3, RandomAccessDataCoder.of().getEncodedElementByteSize(streamA, Context.OUTER));
+ }
+
+ @Test
+ public void testCoderWithPositiveInfinityIsError() throws Exception {
+ expectedException.expect(CoderException.class);
+ expectedException.expectMessage("Positive infinity can not be encoded");
+ RandomAccessDataCoder.of().encode(
+ RandomAccessData.POSITIVE_INFINITY, new ByteArrayOutputStream(), Context.OUTER);
+ }
+
+ @Test
+ public void testLexicographicalComparator() throws Exception {
+ RandomAccessData streamA = new RandomAccessData();
+ streamA.asOutputStream().write(TEST_DATA_A);
+ RandomAccessData streamB = new RandomAccessData();
+ streamB.asOutputStream().write(TEST_DATA_B);
+ RandomAccessData streamC = new RandomAccessData();
+ streamC.asOutputStream().write(TEST_DATA_C);
+ assertTrue(RandomAccessData.UNSIGNED_LEXICOGRAPHICAL_COMPARATOR.compare(
+ streamA, streamB) < 0);
+ assertTrue(RandomAccessData.UNSIGNED_LEXICOGRAPHICAL_COMPARATOR.compare(
+ streamB, streamA) > 0);
+ assertTrue(RandomAccessData.UNSIGNED_LEXICOGRAPHICAL_COMPARATOR.compare(
+ streamB, streamB) == 0);
+ // Check common prefix length.
+ assertEquals(2, RandomAccessData.UNSIGNED_LEXICOGRAPHICAL_COMPARATOR.commonPrefixLength(
+ streamB, streamC));
+ // Check that we honor the start offset.
+ assertTrue(RandomAccessData.UNSIGNED_LEXICOGRAPHICAL_COMPARATOR.compare(
+ streamB, streamC, 3) == 0);
+ // Test positive infinity comparisons.
+ assertTrue(RandomAccessData.UNSIGNED_LEXICOGRAPHICAL_COMPARATOR.compare(
+ streamA, RandomAccessData.POSITIVE_INFINITY) < 0);
+ assertTrue(RandomAccessData.UNSIGNED_LEXICOGRAPHICAL_COMPARATOR.compare(
+ RandomAccessData.POSITIVE_INFINITY, RandomAccessData.POSITIVE_INFINITY) == 0);
+ assertTrue(RandomAccessData.UNSIGNED_LEXICOGRAPHICAL_COMPARATOR.compare(
+ RandomAccessData.POSITIVE_INFINITY, streamA) > 0);
+ }
+
+ @Test
+ public void testEqualsAndHashCode() throws Exception {
+ // Test that equality by reference works
+ RandomAccessData streamA = new RandomAccessData();
+ streamA.asOutputStream().write(TEST_DATA_A);
+ assertEquals(streamA, streamA);
+ assertEquals(streamA.hashCode(), streamA.hashCode());
+
+ // Test different objects containing the same data are the same
+ RandomAccessData streamACopy = new RandomAccessData();
+ streamACopy.asOutputStream().write(TEST_DATA_A);
+ assertEquals(streamA, streamACopy);
+ assertEquals(streamA.hashCode(), streamACopy.hashCode());
+
+ // Test same length streams with different data differ
+ RandomAccessData streamB = new RandomAccessData();
+ streamB.asOutputStream().write(new byte[]{ 0x01, 0x02, 0x04 });
+ assertNotEquals(streamA, streamB);
+ assertNotEquals(streamA.hashCode(), streamB.hashCode());
+
+ // Test different length streams differ
+ streamB.asOutputStream().write(TEST_DATA_B);
+ assertNotEquals(streamA, streamB);
+ assertNotEquals(streamA.hashCode(), streamB.hashCode());
+ }
+
+ @Test
+ public void testResetTo() throws Exception {
+ RandomAccessData stream = new RandomAccessData();
+ stream.asOutputStream().write(TEST_DATA_A);
+ stream.resetTo(1);
+ assertEquals(1, stream.size());
+ stream.asOutputStream().write(TEST_DATA_A);
+ assertArrayEquals(new byte[]{ 0x01, 0x01, 0x02, 0x03 },
+ Arrays.copyOf(stream.array(), stream.size()));
+ }
+
+ @Test
+ public void testAsInputStream() throws Exception {
+ RandomAccessData stream = new RandomAccessData();
+ stream.asOutputStream().write(TEST_DATA_A);
+ InputStream in = stream.asInputStream(1, 1);
+ assertEquals(0x02, in.read());
+ assertEquals(-1, in.read());
+ in.close();
+ }
+
+ @Test
+ public void testReadFrom() throws Exception {
+ ByteArrayInputStream bais = new ByteArrayInputStream(TEST_DATA_A);
+ RandomAccessData stream = new RandomAccessData();
+ stream.readFrom(bais, 3, 2);
+ assertArrayEquals(new byte[]{ 0x00, 0x00, 0x00, 0x01, 0x02 },
+ Arrays.copyOf(stream.array(), stream.size()));
+ bais.close();
+ }
+
+ @Test
+ public void testWriteTo() throws Exception {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ RandomAccessData stream = new RandomAccessData();
+ stream.asOutputStream().write(TEST_DATA_B);
+ stream.writeTo(baos, 1, 2);
+ assertArrayEquals(new byte[]{ 0x05, 0x04 }, baos.toByteArray());
+ baos.close();
+ }
+
+ @Test
+ public void testThatRandomAccessDataGrowsWhenResettingToPositionBeyondEnd() throws Exception {
+ RandomAccessData stream = new RandomAccessData(0);
+ assertArrayEquals(new byte[0], stream.array());
+ stream.resetTo(3); // force resize
+ assertArrayEquals(new byte[]{ 0x00, 0x00, 0x00 }, stream.array());
+ }
+
+ @Test
+ public void testThatRandomAccessDataGrowsWhenReading() throws Exception {
+ RandomAccessData stream = new RandomAccessData(0);
+ assertArrayEquals(new byte[0], stream.array());
+ stream.readFrom(new ByteArrayInputStream(TEST_DATA_A), 0, TEST_DATA_A.length);
+ assertArrayEquals(TEST_DATA_A,
+ Arrays.copyOf(stream.array(), TEST_DATA_A.length));
+ }
+
+ @Test
+ public void testIncrement() throws Exception {
+ assertEquals(new RandomAccessData(new byte[]{ 0x00, 0x01 }),
+ new RandomAccessData(new byte[]{ 0x00, 0x00 }).increment());
+ assertEquals(new RandomAccessData(new byte[]{ 0x01, UnsignedBytes.MAX_VALUE }),
+ new RandomAccessData(new byte[]{ 0x00, UnsignedBytes.MAX_VALUE }).increment());
+
+ // Test for positive infinity
+ assertSame(RandomAccessData.POSITIVE_INFINITY, new RandomAccessData(new byte[0]).increment());
+ assertSame(RandomAccessData.POSITIVE_INFINITY,
+ new RandomAccessData(new byte[]{ UnsignedBytes.MAX_VALUE }).increment());
+ assertSame(RandomAccessData.POSITIVE_INFINITY, RandomAccessData.POSITIVE_INFINITY.increment());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6023d26a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/TimeUtilTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/TimeUtilTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/TimeUtilTest.java
new file mode 100644
index 0000000..c22c223
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/TimeUtilTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.util;
+
+import static org.apache.beam.runners.dataflow.util.TimeUtil.fromCloudDuration;
+import static org.apache.beam.runners.dataflow.util.TimeUtil.fromCloudTime;
+import static org.apache.beam.runners.dataflow.util.TimeUtil.toCloudDuration;
+import static org.apache.beam.runners.dataflow.util.TimeUtil.toCloudTime;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Unit tests for {@link TimeUtil}. */
+@RunWith(JUnit4.class)
+public final class TimeUtilTest {
+ @Test
+ public void toCloudTimeShouldPrintTimeStrings() {
+ assertEquals("1970-01-01T00:00:00Z", toCloudTime(new Instant(0)));
+ assertEquals("1970-01-01T00:00:00.001Z", toCloudTime(new Instant(1)));
+ }
+
+ @Test
+ public void fromCloudTimeShouldParseTimeStrings() {
+ assertEquals(new Instant(0), fromCloudTime("1970-01-01T00:00:00Z"));
+ assertEquals(new Instant(1), fromCloudTime("1970-01-01T00:00:00.001Z"));
+ assertEquals(new Instant(1), fromCloudTime("1970-01-01T00:00:00.001000Z"));
+ assertEquals(new Instant(1), fromCloudTime("1970-01-01T00:00:00.001001Z"));
+ assertEquals(new Instant(1), fromCloudTime("1970-01-01T00:00:00.001000000Z"));
+ assertEquals(new Instant(1), fromCloudTime("1970-01-01T00:00:00.001000001Z"));
+ assertNull(fromCloudTime(""));
+ assertNull(fromCloudTime("1970-01-01T00:00:00"));
+ }
+
+ @Test
+ public void toCloudDurationShouldPrintDurationStrings() {
+ assertEquals("0s", toCloudDuration(Duration.ZERO));
+ assertEquals("4s", toCloudDuration(Duration.millis(4000)));
+ assertEquals("4.001s", toCloudDuration(Duration.millis(4001)));
+ }
+
+ @Test
+ public void fromCloudDurationShouldParseDurationStrings() {
+ assertEquals(Duration.millis(4000), fromCloudDuration("4s"));
+ assertEquals(Duration.millis(4001), fromCloudDuration("4.001s"));
+ assertEquals(Duration.millis(4001), fromCloudDuration("4.001000s"));
+ assertEquals(Duration.millis(4001), fromCloudDuration("4.001001s"));
+ assertEquals(Duration.millis(4001), fromCloudDuration("4.001000000s"));
+ assertEquals(Duration.millis(4001), fromCloudDuration("4.001000001s"));
+ assertNull(fromCloudDuration(""));
+ assertNull(fromCloudDuration("4"));
+ assertNull(fromCloudDuration("4.1"));
+ assertNull(fromCloudDuration("4.1s"));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6023d26a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/util/RandomAccessDataTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/util/RandomAccessDataTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/util/RandomAccessDataTest.java
deleted file mode 100644
index b990212..0000000
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/util/RandomAccessDataTest.java
+++ /dev/null
@@ -1,207 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertSame;
-import static org.junit.Assert.assertTrue;
-
-import org.apache.beam.sdk.coders.Coder.Context;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.testing.CoderProperties;
-import org.apache.beam.sdk.util.RandomAccessData.RandomAccessDataCoder;
-
-import com.google.common.primitives.UnsignedBytes;
-
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.InputStream;
-import java.util.Arrays;
-
-/**
- * Tests for {@link RandomAccessData}.
- */
-@RunWith(JUnit4.class)
-public class RandomAccessDataTest {
- private static final byte[] TEST_DATA_A = new byte[]{ 0x01, 0x02, 0x03 };
- private static final byte[] TEST_DATA_B = new byte[]{ 0x06, 0x05, 0x04, 0x03 };
- private static final byte[] TEST_DATA_C = new byte[]{ 0x06, 0x05, 0x03, 0x03 };
-
- @Rule public ExpectedException expectedException = ExpectedException.none();
-
- @Test
- public void testCoder() throws Exception {
- RandomAccessData streamA = new RandomAccessData();
- streamA.asOutputStream().write(TEST_DATA_A);
- RandomAccessData streamB = new RandomAccessData();
- streamB.asOutputStream().write(TEST_DATA_A);
- CoderProperties.coderDecodeEncodeEqual(RandomAccessDataCoder.of(), streamA);
- CoderProperties.coderDeterministic(RandomAccessDataCoder.of(), streamA, streamB);
- CoderProperties.coderConsistentWithEquals(RandomAccessDataCoder.of(), streamA, streamB);
- CoderProperties.coderSerializable(RandomAccessDataCoder.of());
- CoderProperties.structuralValueConsistentWithEquals(
- RandomAccessDataCoder.of(), streamA, streamB);
- assertTrue(RandomAccessDataCoder.of().isRegisterByteSizeObserverCheap(streamA, Context.NESTED));
- assertTrue(RandomAccessDataCoder.of().isRegisterByteSizeObserverCheap(streamA, Context.OUTER));
- assertEquals(4, RandomAccessDataCoder.of().getEncodedElementByteSize(streamA, Context.NESTED));
- assertEquals(3, RandomAccessDataCoder.of().getEncodedElementByteSize(streamA, Context.OUTER));
- }
-
- @Test
- public void testCoderWithPositiveInfinityIsError() throws Exception {
- expectedException.expect(CoderException.class);
- expectedException.expectMessage("Positive infinity can not be encoded");
- RandomAccessDataCoder.of().encode(
- RandomAccessData.POSITIVE_INFINITY, new ByteArrayOutputStream(), Context.OUTER);
- }
-
- @Test
- public void testLexicographicalComparator() throws Exception {
- RandomAccessData streamA = new RandomAccessData();
- streamA.asOutputStream().write(TEST_DATA_A);
- RandomAccessData streamB = new RandomAccessData();
- streamB.asOutputStream().write(TEST_DATA_B);
- RandomAccessData streamC = new RandomAccessData();
- streamC.asOutputStream().write(TEST_DATA_C);
- assertTrue(RandomAccessData.UNSIGNED_LEXICOGRAPHICAL_COMPARATOR.compare(
- streamA, streamB) < 0);
- assertTrue(RandomAccessData.UNSIGNED_LEXICOGRAPHICAL_COMPARATOR.compare(
- streamB, streamA) > 0);
- assertTrue(RandomAccessData.UNSIGNED_LEXICOGRAPHICAL_COMPARATOR.compare(
- streamB, streamB) == 0);
- // Check common prefix length.
- assertEquals(2, RandomAccessData.UNSIGNED_LEXICOGRAPHICAL_COMPARATOR.commonPrefixLength(
- streamB, streamC));
- // Check that we honor the start offset.
- assertTrue(RandomAccessData.UNSIGNED_LEXICOGRAPHICAL_COMPARATOR.compare(
- streamB, streamC, 3) == 0);
- // Test positive infinity comparisons.
- assertTrue(RandomAccessData.UNSIGNED_LEXICOGRAPHICAL_COMPARATOR.compare(
- streamA, RandomAccessData.POSITIVE_INFINITY) < 0);
- assertTrue(RandomAccessData.UNSIGNED_LEXICOGRAPHICAL_COMPARATOR.compare(
- RandomAccessData.POSITIVE_INFINITY, RandomAccessData.POSITIVE_INFINITY) == 0);
- assertTrue(RandomAccessData.UNSIGNED_LEXICOGRAPHICAL_COMPARATOR.compare(
- RandomAccessData.POSITIVE_INFINITY, streamA) > 0);
- }
-
- @Test
- public void testEqualsAndHashCode() throws Exception {
- // Test that equality by reference works
- RandomAccessData streamA = new RandomAccessData();
- streamA.asOutputStream().write(TEST_DATA_A);
- assertEquals(streamA, streamA);
- assertEquals(streamA.hashCode(), streamA.hashCode());
-
- // Test different objects containing the same data are the same
- RandomAccessData streamACopy = new RandomAccessData();
- streamACopy.asOutputStream().write(TEST_DATA_A);
- assertEquals(streamA, streamACopy);
- assertEquals(streamA.hashCode(), streamACopy.hashCode());
-
- // Test same length streams with different data differ
- RandomAccessData streamB = new RandomAccessData();
- streamB.asOutputStream().write(new byte[]{ 0x01, 0x02, 0x04 });
- assertNotEquals(streamA, streamB);
- assertNotEquals(streamA.hashCode(), streamB.hashCode());
-
- // Test different length streams differ
- streamB.asOutputStream().write(TEST_DATA_B);
- assertNotEquals(streamA, streamB);
- assertNotEquals(streamA.hashCode(), streamB.hashCode());
- }
-
- @Test
- public void testResetTo() throws Exception {
- RandomAccessData stream = new RandomAccessData();
- stream.asOutputStream().write(TEST_DATA_A);
- stream.resetTo(1);
- assertEquals(1, stream.size());
- stream.asOutputStream().write(TEST_DATA_A);
- assertArrayEquals(new byte[]{ 0x01, 0x01, 0x02, 0x03 },
- Arrays.copyOf(stream.array(), stream.size()));
- }
-
- @Test
- public void testAsInputStream() throws Exception {
- RandomAccessData stream = new RandomAccessData();
- stream.asOutputStream().write(TEST_DATA_A);
- InputStream in = stream.asInputStream(1, 1);
- assertEquals(0x02, in.read());
- assertEquals(-1, in.read());
- in.close();
- }
-
- @Test
- public void testReadFrom() throws Exception {
- ByteArrayInputStream bais = new ByteArrayInputStream(TEST_DATA_A);
- RandomAccessData stream = new RandomAccessData();
- stream.readFrom(bais, 3, 2);
- assertArrayEquals(new byte[]{ 0x00, 0x00, 0x00, 0x01, 0x02 },
- Arrays.copyOf(stream.array(), stream.size()));
- bais.close();
- }
-
- @Test
- public void testWriteTo() throws Exception {
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- RandomAccessData stream = new RandomAccessData();
- stream.asOutputStream().write(TEST_DATA_B);
- stream.writeTo(baos, 1, 2);
- assertArrayEquals(new byte[]{ 0x05, 0x04 }, baos.toByteArray());
- baos.close();
- }
-
- @Test
- public void testThatRandomAccessDataGrowsWhenResettingToPositionBeyondEnd() throws Exception {
- RandomAccessData stream = new RandomAccessData(0);
- assertArrayEquals(new byte[0], stream.array());
- stream.resetTo(3); // force resize
- assertArrayEquals(new byte[]{ 0x00, 0x00, 0x00 }, stream.array());
- }
-
- @Test
- public void testThatRandomAccessDataGrowsWhenReading() throws Exception {
- RandomAccessData stream = new RandomAccessData(0);
- assertArrayEquals(new byte[0], stream.array());
- stream.readFrom(new ByteArrayInputStream(TEST_DATA_A), 0, TEST_DATA_A.length);
- assertArrayEquals(TEST_DATA_A,
- Arrays.copyOf(stream.array(), TEST_DATA_A.length));
- }
-
- @Test
- public void testIncrement() throws Exception {
- assertEquals(new RandomAccessData(new byte[]{ 0x00, 0x01 }),
- new RandomAccessData(new byte[]{ 0x00, 0x00 }).increment());
- assertEquals(new RandomAccessData(new byte[]{ 0x01, UnsignedBytes.MAX_VALUE }),
- new RandomAccessData(new byte[]{ 0x00, UnsignedBytes.MAX_VALUE }).increment());
-
- // Test for positive infinity
- assertSame(RandomAccessData.POSITIVE_INFINITY, new RandomAccessData(new byte[0]).increment());
- assertSame(RandomAccessData.POSITIVE_INFINITY,
- new RandomAccessData(new byte[]{ UnsignedBytes.MAX_VALUE }).increment());
- assertSame(RandomAccessData.POSITIVE_INFINITY, RandomAccessData.POSITIVE_INFINITY.increment());
- }
-}
[6/7] incubator-beam git commit: [BEAM-151] Move over some more
Dataflow specific classes.
Posted by lc...@apache.org.
[BEAM-151] Move over some more Dataflow specific classes.
Note that users should use proto ByteString instead of RandomAccessData
since it provides a safer version of the same functionality.
I hoped that I would be able to move over more of the *Cloud* classes
and their helpers but they are embedded part of coders. Nothing more
can be done here until there is an official Beam representation of a coder
decoupled from Dataflow CloudKnownTypes.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/27979d76
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/27979d76
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/27979d76
Branch: refs/heads/master
Commit: 27979d76666ea8a5ea2d5efe1ac3159cebaeec97
Parents: 81bf4d9
Author: Luke Cwik <lc...@google.com>
Authored: Tue Apr 26 11:43:26 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Wed Apr 27 17:26:30 2016 -0700
----------------------------------------------------------------------
runners/google-cloud-dataflow-java/pom.xml | 6 +
.../beam/sdk/runners/worker/IsmFormat.java | 811 +++++++++++++++++++
.../beam/sdk/runners/worker/package-info.java | 25 +
.../java/org/apache/beam/sdk/util/DoFnInfo.java | 68 ++
.../apache/beam/sdk/util/OutputReference.java | 43 +
.../apache/beam/sdk/util/RandomAccessData.java | 355 ++++++++
.../java/org/apache/beam/sdk/util/TimeUtil.java | 166 ++++
.../beam/sdk/util/RandomAccessDataTest.java | 207 +++++
.../org/apache/beam/sdk/util/TimeUtilTest.java | 75 ++
.../beam/sdk/runners/worker/IsmFormat.java | 811 -------------------
.../beam/sdk/runners/worker/package-info.java | 25 -
.../java/org/apache/beam/sdk/util/DoFnInfo.java | 68 --
.../apache/beam/sdk/util/OutputReference.java | 43 -
.../apache/beam/sdk/util/RandomAccessData.java | 355 --------
.../java/org/apache/beam/sdk/util/TimeUtil.java | 166 ----
.../sdk/util/common/worker/package-info.java | 19 -
.../beam/sdk/util/RandomAccessDataTest.java | 207 -----
.../org/apache/beam/sdk/util/TimeUtilTest.java | 75 --
18 files changed, 1756 insertions(+), 1769 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/27979d76/runners/google-cloud-dataflow-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/pom.xml b/runners/google-cloud-dataflow-java/pom.xml
index beb340c..23b7f5f 100644
--- a/runners/google-cloud-dataflow-java/pom.xml
+++ b/runners/google-cloud-dataflow-java/pom.xml
@@ -524,6 +524,12 @@
<version>1.0-rc2</version>
<optional>true</optional>
</dependency>
+ <dependency>
+ <groupId>com.google.auto.value</groupId>
+ <artifactId>auto-value</artifactId>
+ <version>1.1</version>
+ <scope>provided</scope>
+ </dependency>
<!-- test dependencies -->
<dependency>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/27979d76/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/worker/IsmFormat.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/worker/IsmFormat.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/worker/IsmFormat.java
new file mode 100644
index 0000000..8df46dd
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/worker/IsmFormat.java
@@ -0,0 +1,811 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.runners.worker;
+
+import static org.apache.beam.sdk.util.Structs.addLong;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.Coder.NonDeterministicException;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.coders.StandardCoder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.util.CloudObject;
+import org.apache.beam.sdk.util.PropertyNames;
+import org.apache.beam.sdk.util.RandomAccessData;
+import org.apache.beam.sdk.util.VarInt;
+import org.apache.beam.sdk.values.PCollection;
+
+import com.google.auto.value.AutoValue;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hashing;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import javax.annotation.Nullable;
+
+/**
+ * An Ism file is a prefix encoded composite key value file broken into shards. Each composite
+ * key is composed of a fixed number of component keys. A fixed number of those sub keys represent
+ * the shard key portion; see {@link IsmRecord} and {@link IsmRecordCoder} for further details
+ * around the data format. In addition to the data, there is a bloom filter,
+ * and multiple indices to allow for efficient retrieval.
+ *
+ * <p>An Ism file is composed of these high level sections (in order):
+ * <ul>
+ * <li>shard block</li>
+ * <li>bloom filter (See {@code ScalableBloomFilter} for details on encoding format)</li>
+ * <li>shard index</li>
+ * <li>footer (See {@link Footer} for details on encoding format)</li>
+ * </ul>
+ *
+ * <p>The shard block is composed of multiple copies of the following:
+ * <ul>
+ * <li>data block</li>
+ * <li>data index</li>
+ * </ul>
+ *
+ * <p>The data block is composed of multiple copies of the following:
+ * <ul>
+ * <li>key prefix (See {@link KeyPrefix} for details on encoding format)</li>
+ * <li>unshared key bytes</li>
+ * <li>value bytes</li>
+ * <li>optional 0x00 0x00 bytes followed by metadata bytes
+ * (if the following 0x00 0x00 bytes are not present, then there are no metadata bytes)</li>
+ * </ul>
+ * Each key written into the data block must be in unsigned lexicographically increasing order
+ * and also its shard portion of the key must hash to the same shard id as all other keys
+ * within the same data block. The hashing function used is the
+ * <a href="http://smhasher.googlecode.com/svn/trunk/MurmurHash3.cpp">
+ * 32-bit murmur3 algorithm, x86 variant</a> (little-endian variant),
+ * using {@code 1225801234} as the seed value.
+ *
+ * <p>The data index is composed of {@code N} copies of the following:
+ * <ul>
+ * <li>key prefix (See {@link KeyPrefix} for details on encoding format)</li>
+ * <li>unshared key bytes</li>
+ * <li>byte offset to key prefix in data block (variable length long coding)</li>
+ * </ul>
+ *
+ * <p>The shard index is composed of a {@link VarInt variable length integer} encoding representing
+ * the number of shard index records followed by that many shard index records.
+ * See {@link IsmShardCoder} for further details as to its encoding scheme.
+ */
+public class IsmFormat {
+ private static final int HASH_SEED = 1225801234;
+ private static final HashFunction HASH_FUNCTION = Hashing.murmur3_32(HASH_SEED);
+ public static final int SHARD_BITS = 0x7F; // [0-127] shards + [128-255] metadata shards
+
+ /**
+ * A record containing a composite key and either a value or metadata. The composite key
+ * must not contain the metadata key component place holder if producing a value record, and must
+ * contain the metadata component key place holder if producing a metadata record.
+ *
+ * <p>The composite key is a fixed number of component keys where the first {@code N} component
+ * keys are used to create a shard id via hashing. See {@link IsmRecordCoder#hash(List)} for
+ * further details.
+ */
+ @AutoValue
+ public abstract static class IsmRecord<V> {
+ abstract List<?> keyComponents();
+ @Nullable abstract V value();
+ @Nullable abstract byte[] metadata();
+
+ IsmRecord() {} // Prevent public constructor
+
+ /** Returns an IsmRecord with the specified key components and value. */
+ public static <V> IsmRecord<V> of(List<?> keyComponents, V value) {
+ checkArgument(!keyComponents.isEmpty(), "Expected non-empty list of key components.");
+ checkArgument(!isMetadataKey(keyComponents),
+ "Expected key components to not contain metadata key.");
+ return new AutoValue_IsmFormat_IsmRecord<V>(keyComponents, value, null);
+ }
+
+ public static <V> IsmRecord<V> meta(List<?> keyComponents, byte[] metadata) {
+ checkNotNull(metadata);
+ checkArgument(!keyComponents.isEmpty(), "Expected non-empty list of key components.");
+ checkArgument(isMetadataKey(keyComponents),
+ "Expected key components to contain metadata key.");
+ return new AutoValue_IsmFormat_IsmRecord<V>(keyComponents, null, metadata);
+ }
+
+ /** Returns the list of key components. */
+ public List<?> getKeyComponents() {
+ return keyComponents();
+ }
+
+ /** Returns the key component at the specified index. */
+ public Object getKeyComponent(int index) {
+ return keyComponents().get(index);
+ }
+
+ /**
+ * Returns the value. Throws {@link IllegalStateException} if this is not a
+ * value record.
+ */
+ public V getValue() {
+ checkState(!isMetadataKey(keyComponents()),
+ "This is a metadata record and not a value record.");
+ return value();
+ }
+
+ /**
+ * Returns the metadata. Throws {@link IllegalStateException} if this is not a
+ * metadata record.
+ */
+ public byte[] getMetadata() {
+ checkState(isMetadataKey(keyComponents()),
+ "This is a value record and not a metadata record.");
+ return metadata();
+ }
+ }
+
+ /** A {@link Coder} for {@link IsmRecord}s.
+ *
+ * <p>Note that this coder standalone will not produce an Ism file. This coder can be used
+ * to materialize a {@link PCollection} of {@link IsmRecord}s. Only when this coder
+ * is combined with an {@link IsmSink} will one produce an Ism file.
+ *
+ * <p>The {@link IsmRecord} encoded format is:
+ * <ul>
+ * <li>encoded key component 1 using key component coder 1</li>
+ * <li>...</li>
+ * <li>encoded key component N using key component coder N</li>
+ * <li>encoded value using value coder</li>
+ * </ul>
+ */
+ public static class IsmRecordCoder<V>
+ extends StandardCoder<IsmRecord<V>> {
+ /** Returns an IsmRecordCoder with the specified key component coders, value coder. */
+ public static <V> IsmRecordCoder<V> of(
+ int numberOfShardKeyCoders,
+ int numberOfMetadataShardKeyCoders,
+ List<Coder<?>> keyComponentCoders,
+ Coder<V> valueCoder) {
+ checkNotNull(keyComponentCoders);
+ checkArgument(keyComponentCoders.size() > 0);
+ checkArgument(numberOfShardKeyCoders > 0);
+ checkArgument(numberOfShardKeyCoders <= keyComponentCoders.size());
+ checkArgument(numberOfMetadataShardKeyCoders <= keyComponentCoders.size());
+ return new IsmRecordCoder<>(
+ numberOfShardKeyCoders,
+ numberOfMetadataShardKeyCoders,
+ keyComponentCoders,
+ valueCoder);
+ }
+
+ /**
+ * Returns an IsmRecordCoder with the specified coders. Note that this method is not meant
+ * to be called by users but used by Jackson when decoding this coder.
+ */
+ @JsonCreator
+ public static IsmRecordCoder<?> of(
+ @JsonProperty(PropertyNames.NUM_SHARD_CODERS) int numberOfShardCoders,
+ @JsonProperty(PropertyNames.NUM_METADATA_SHARD_CODERS) int numberOfMetadataShardCoders,
+ @JsonProperty(PropertyNames.COMPONENT_ENCODINGS) List<Coder<?>> components) {
+ Preconditions.checkArgument(components.size() >= 2,
+ "Expecting at least 2 components, got " + components.size());
+ return of(
+ numberOfShardCoders,
+ numberOfMetadataShardCoders,
+ components.subList(0, components.size() - 1),
+ components.get(components.size() - 1));
+ }
+
+ private final int numberOfShardKeyCoders;
+ private final int numberOfMetadataShardKeyCoders;
+ private final List<Coder<?>> keyComponentCoders;
+ private final Coder<V> valueCoder;
+
+ private IsmRecordCoder(
+ int numberOfShardKeyCoders,
+ int numberOfMetadataShardKeyCoders,
+ List<Coder<?>> keyComponentCoders, Coder<V> valueCoder) {
+ this.numberOfShardKeyCoders = numberOfShardKeyCoders;
+ this.numberOfMetadataShardKeyCoders = numberOfMetadataShardKeyCoders;
+ this.keyComponentCoders = keyComponentCoders;
+ this.valueCoder = valueCoder;
+ }
+
+ /** Returns the list of key component coders. */
+ public List<Coder<?>> getKeyComponentCoders() {
+ return keyComponentCoders;
+ }
+
+ /** Returns the key coder at the specified index. */
+ public Coder getKeyComponentCoder(int index) {
+ return keyComponentCoders.get(index);
+ }
+
+ /** Returns the value coder. */
+ public Coder<V> getValueCoder() {
+ return valueCoder;
+ }
+
+ @Override
+ public void encode(IsmRecord<V> value, OutputStream outStream,
+ Coder.Context context) throws CoderException, IOException {
+ if (value.getKeyComponents().size() != keyComponentCoders.size()) {
+ throw new CoderException(String.format(
+ "Expected %s key component(s) but received key component(s) %s.",
+ keyComponentCoders.size(), value.getKeyComponents()));
+ }
+ for (int i = 0; i < keyComponentCoders.size(); ++i) {
+ getKeyComponentCoder(i).encode(value.getKeyComponent(i), outStream, context.nested());
+ }
+ if (isMetadataKey(value.getKeyComponents())) {
+ ByteArrayCoder.of().encode(value.getMetadata(), outStream, context.nested());
+ } else {
+ valueCoder.encode(value.getValue(), outStream, context.nested());
+ }
+ }
+
+ @Override
+ public IsmRecord<V> decode(InputStream inStream, Coder.Context context)
+ throws CoderException, IOException {
+ List<Object> keyComponents = new ArrayList<>(keyComponentCoders.size());
+ for (Coder<?> keyCoder : keyComponentCoders) {
+ keyComponents.add(keyCoder.decode(inStream, context.nested()));
+ }
+ if (isMetadataKey(keyComponents)) {
+ return IsmRecord.<V>meta(
+ keyComponents, ByteArrayCoder.of().decode(inStream, context.nested()));
+ } else {
+ return IsmRecord.<V>of(keyComponents, valueCoder.decode(inStream, context.nested()));
+ }
+ }
+
+ public int getNumberOfShardKeyCoders(List<?> keyComponents) {
+ if (isMetadataKey(keyComponents)) {
+ return numberOfMetadataShardKeyCoders;
+ } else {
+ return numberOfShardKeyCoders;
+ }
+ }
+
+ /**
+ * Computes the shard id for the given key component(s).
+ *
+ * The shard keys are encoded into their byte representations and hashed using the
+ * <a href="http://smhasher.googlecode.com/svn/trunk/MurmurHash3.cpp">
+ * 32-bit murmur3 algorithm, x86 variant</a> (little-endian variant),
+ * using {@code 1225801234} as the seed value. We ensure that shard ids for
+ * metadata keys and normal keys do not overlap.
+ */
+ public <V, T> int hash(List<?> keyComponents) {
+ return encodeAndHash(keyComponents, new RandomAccessData(), new ArrayList<Integer>());
+ }
+
+ /**
+ * Computes the shard id for the given key component(s).
+ *
+ * Mutates {@code keyBytes} such that when returned, contains the encoded
+ * version of the key components.
+ */
+ public <V, T> int encodeAndHash(List<?> keyComponents, RandomAccessData keyBytesToMutate) {
+ return encodeAndHash(keyComponents, keyBytesToMutate, new ArrayList<Integer>());
+ }
+
+ /**
+ * Computes the shard id for the given key component(s).
+ *
+ * Mutates {@code keyBytes} such that when returned, contains the encoded
+ * version of the key components. Also, mutates {@code keyComponentByteOffsetsToMutate} to
+ * store the location where each key component's encoded byte representation ends within
+ * {@code keyBytes}.
+ */
+ public <V, T> int encodeAndHash(
+ List<?> keyComponents,
+ RandomAccessData keyBytesToMutate,
+ List<Integer> keyComponentByteOffsetsToMutate) {
+ checkNotNull(keyComponents);
+ checkArgument(keyComponents.size() <= keyComponentCoders.size(),
+ "Expected at most %s key component(s) but received %s.",
+ keyComponentCoders.size(), keyComponents);
+
+ final int numberOfKeyCodersToUse;
+ final int shardOffset;
+ if (isMetadataKey(keyComponents)) {
+ numberOfKeyCodersToUse = numberOfMetadataShardKeyCoders;
+ shardOffset = SHARD_BITS + 1;
+ } else {
+ numberOfKeyCodersToUse = numberOfShardKeyCoders;
+ shardOffset = 0;
+ }
+
+ checkArgument(numberOfKeyCodersToUse <= keyComponents.size(),
+ "Expected at least %s key component(s) but received %s.",
+ numberOfShardKeyCoders, keyComponents);
+
+ try {
+ // Encode the shard portion
+ for (int i = 0; i < numberOfKeyCodersToUse; ++i) {
+ getKeyComponentCoder(i).encode(
+ keyComponents.get(i), keyBytesToMutate.asOutputStream(), Context.NESTED);
+ keyComponentByteOffsetsToMutate.add(keyBytesToMutate.size());
+ }
+ int rval = HASH_FUNCTION.hashBytes(
+ keyBytesToMutate.array(), 0, keyBytesToMutate.size()).asInt() & SHARD_BITS;
+ rval += shardOffset;
+
+ // Encode the remainder
+ for (int i = numberOfKeyCodersToUse; i < keyComponents.size(); ++i) {
+ getKeyComponentCoder(i).encode(
+ keyComponents.get(i), keyBytesToMutate.asOutputStream(), Context.NESTED);
+ keyComponentByteOffsetsToMutate.add(keyBytesToMutate.size());
+ }
+ return rval;
+ } catch (IOException e) {
+ throw new IllegalStateException(
+ String.format("Failed to hash %s with coder %s", keyComponents, this), e);
+ }
+ }
+
+ @Override
+ public List<Coder<?>> getCoderArguments() {
+ return ImmutableList.<Coder<?>>builder()
+ .addAll(keyComponentCoders)
+ .add(valueCoder)
+ .build();
+ }
+
+ @Override
+ public CloudObject asCloudObject() {
+ CloudObject cloudObject = super.asCloudObject();
+ addLong(cloudObject, PropertyNames.NUM_SHARD_CODERS, numberOfShardKeyCoders);
+ addLong(cloudObject, PropertyNames.NUM_METADATA_SHARD_CODERS, numberOfMetadataShardKeyCoders);
+ return cloudObject;
+ }
+
+ @Override
+ public void verifyDeterministic() throws Coder.NonDeterministicException {
+ verifyDeterministic("Key component coders expected to be deterministic.", keyComponentCoders);
+ verifyDeterministic("Value coder expected to be deterministic.", valueCoder);
+ }
+
+ @Override
+ public boolean consistentWithEquals() {
+ for (Coder<?> keyComponentCoder : keyComponentCoders) {
+ if (!keyComponentCoder.consistentWithEquals()) {
+ return false;
+ }
+ }
+ return valueCoder.consistentWithEquals();
+ }
+
+ @Override
+ public Object structuralValue(IsmRecord<V> record) throws Exception {
+ checkState(record.getKeyComponents().size() == keyComponentCoders.size(),
+ "Expected the number of key component coders %s "
+ + "to match the number of key components %s.",
+ keyComponentCoders.size(), record.getKeyComponents());
+
+ if (record != null && consistentWithEquals()) {
+ ArrayList<Object> keyComponentStructuralValues = new ArrayList<>();
+ for (int i = 0; i < keyComponentCoders.size(); ++i) {
+ keyComponentStructuralValues.add(
+ getKeyComponentCoder(i).structuralValue(record.getKeyComponent(i)));
+ }
+ if (isMetadataKey(record.getKeyComponents())) {
+ return IsmRecord.meta(keyComponentStructuralValues, record.getMetadata());
+ } else {
+ return IsmRecord.of(keyComponentStructuralValues,
+ valueCoder.structuralValue(record.getValue()));
+ }
+ }
+ return super.structuralValue(record);
+ }
+ }
+
+ /**
+ * Validates that the key portion of the given coder is deterministic.
+ */
+ public static void validateCoderIsCompatible(IsmRecordCoder<?> coder) {
+ for (Coder<?> keyComponentCoder : coder.getKeyComponentCoders()) {
+ try {
+ keyComponentCoder.verifyDeterministic();
+ } catch (NonDeterministicException e) {
+ throw new IllegalArgumentException(
+ String.format("Key component coder %s is expected to be deterministic.",
+ keyComponentCoder), e);
+ }
+ }
+ }
+
+ /** Returns true if and only if any of the passed in key components represent a metadata key. */
+ public static boolean isMetadataKey(List<?> keyComponents) {
+ for (Object keyComponent : keyComponents) {
+ if (keyComponent == METADATA_KEY) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /** A marker object representing the wildcard metadata key component. */
+ private static final Object METADATA_KEY = new Object() {
+ @Override
+ public String toString() {
+ return "META";
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ return this == obj;
+ }
+
+ @Override
+ public int hashCode() {
+ return -1248902349;
+ }
+ };
+
+ /**
+ * An object representing a wild card for a key component.
+ * Encoded using {@link MetadataKeyCoder}.
+ */
+ public static Object getMetadataKey() {
+ return METADATA_KEY;
+ }
+
+ /**
+ * A coder for metadata key component. Can be used to wrap key component coder allowing for
+ * the metadata key component to be used as a place holder instead of an actual key.
+ */
+ public static class MetadataKeyCoder<K> extends StandardCoder<K> {
+ public static <K> MetadataKeyCoder<K> of(Coder<K> keyCoder) {
+ checkNotNull(keyCoder);
+ return new MetadataKeyCoder<>(keyCoder);
+ }
+
+ /**
+ * Returns an IsmRecordCoder with the specified coders. Note that this method is not meant
+ * to be called by users but used by Jackson when decoding this coder.
+ */
+ @JsonCreator
+ public static MetadataKeyCoder<?> of(
+ @JsonProperty(PropertyNames.COMPONENT_ENCODINGS) List<Coder<?>> components) {
+ Preconditions.checkArgument(components.size() == 1,
+ "Expecting one component, got " + components.size());
+ return of(components.get(0));
+ }
+
+ private final Coder<K> keyCoder;
+
+ private MetadataKeyCoder(Coder<K> keyCoder) {
+ this.keyCoder = keyCoder;
+ }
+
+ public Coder<K> getKeyCoder() {
+ return keyCoder;
+ }
+
+ @Override
+ public void encode(K value, OutputStream outStream, Coder.Context context)
+ throws CoderException, IOException {
+ if (value == METADATA_KEY) {
+ outStream.write(0);
+ } else {
+ outStream.write(1);
+ keyCoder.encode(value, outStream, context.nested());
+ }
+ }
+
+ @Override
+ public K decode(InputStream inStream, Coder.Context context)
+ throws CoderException, IOException {
+ int marker = inStream.read();
+ if (marker == 0) {
+ return (K) getMetadataKey();
+ } else if (marker == 1) {
+ return keyCoder.decode(inStream, context.nested());
+ } else {
+ throw new CoderException(String.format("Expected marker but got %s.", marker));
+ }
+ }
+
+ @Override
+ public List<Coder<?>> getCoderArguments() {
+ return ImmutableList.<Coder<?>>of(keyCoder);
+ }
+
+ @Override
+ public void verifyDeterministic() throws NonDeterministicException {
+ verifyDeterministic("Expected key coder to be deterministic", keyCoder);
+ }
+ }
+
+ /**
+ * A shard descriptor containing shard id, the data block offset, and the index offset for the
+ * given shard.
+ */
+ @AutoValue
+ public abstract static class IsmShard {
+ abstract int id();
+ abstract long blockOffset();
+ abstract long indexOffset();
+
+ IsmShard() {}
+
+ /** Returns an IsmShard with the given id, block offset and no index offset. */
+ public static IsmShard of(int id, long blockOffset) {
+ IsmShard ismShard = new AutoValue_IsmFormat_IsmShard(id, blockOffset, -1);
+ checkState(id >= 0,
+ "%s attempting to be written with negative shard id.",
+ ismShard);
+ checkState(blockOffset >= 0,
+ "%s attempting to be written with negative block offset.",
+ ismShard);
+ return ismShard;
+ }
+
+ /** Returns an IsmShard with the given id, block offset, and index offset. */
+ public static IsmShard of(int id, long blockOffset, long indexOffset) {
+ IsmShard ismShard = new AutoValue_IsmFormat_IsmShard(id, blockOffset, indexOffset);
+ checkState(id >= 0,
+ "%s attempting to be written with negative shard id.",
+ ismShard);
+ checkState(blockOffset >= 0,
+ "%s attempting to be written with negative block offset.",
+ ismShard);
+ checkState(indexOffset >= 0,
+ "%s attempting to be written with negative index offset.",
+ ismShard);
+ return ismShard;
+ }
+
+ /** Return the shard id. */
+ public int getId() {
+ return id();
+ }
+
+ /** Return the absolute position within the Ism file where the data block begins. */
+ public long getBlockOffset() {
+ return blockOffset();
+ }
+
+ /**
+ * Return the absolute position within the Ism file where the index block begins.
+ * Throws {@link IllegalStateException} if the index offset was never specified.
+ */
+ public long getIndexOffset() {
+ checkState(indexOffset() >= 0,
+ "Unable to fetch index offset because it was never specified.");
+ return indexOffset();
+ }
+
+ /** Returns a new IsmShard like this one with the specified index offset. */
+ public IsmShard withIndexOffset(long indexOffset) {
+ return of(id(), blockOffset(), indexOffset);
+ }
+ }
+
+ /**
+ * A {@link ListCoder} wrapping a {@link IsmShardCoder} used to encode the shard index.
+ * See {@link ListCoder} for its encoding specification and {@link IsmShardCoder} for its
+ * encoding specification.
+ */
+ public static final Coder<List<IsmShard>> ISM_SHARD_INDEX_CODER =
+ ListCoder.of(IsmShardCoder.of());
+
+ /**
+ * A coder for {@link IsmShard}s.
+ *
+ * The shard descriptor is encoded as:
+ * <ul>
+ * <li>id (variable length integer encoding)</li>
+ * <li>blockOffset (variable length long encoding)</li>
+ * <li>indexOffset (variable length long encoding)</li>
+ * </ul>
+ */
+ public static class IsmShardCoder extends AtomicCoder<IsmShard> {
+ private static final IsmShardCoder INSTANCE = new IsmShardCoder();
+
+ /** Returns an IsmShardCoder. */
+ @JsonCreator
+ public static IsmShardCoder of() {
+ return INSTANCE;
+ }
+
+ private IsmShardCoder() {
+ }
+
+ @Override
+ public void encode(IsmShard value, OutputStream outStream, Coder.Context context)
+ throws CoderException, IOException {
+ checkState(value.getIndexOffset() >= 0,
+ "%s attempting to be written without index offset.",
+ value);
+ VarIntCoder.of().encode(value.getId(), outStream, context.nested());
+ VarLongCoder.of().encode(value.getBlockOffset(), outStream, context.nested());
+ VarLongCoder.of().encode(value.getIndexOffset(), outStream, context.nested());
+ }
+
+ @Override
+ public IsmShard decode(
+ InputStream inStream, Coder.Context context) throws CoderException, IOException {
+ return IsmShard.of(
+ VarIntCoder.of().decode(inStream, context),
+ VarLongCoder.of().decode(inStream, context),
+ VarLongCoder.of().decode(inStream, context));
+ }
+
+ @Override
+ public boolean consistentWithEquals() {
+ return true;
+ }
+ }
+
+ /**
+ * The prefix used before each key which contains the number of shared and unshared
+ * bytes from the previous key that was read. The key prefix along with the previous key
+ * and the unshared key bytes allows one to construct the current key by doing the following
+ * {@code currentKey = previousKey[0 : sharedBytes] + read(unsharedBytes)}.
+ *
+ * <p>The key prefix is encoded as:
+ * <ul>
+ * <li>number of shared key bytes (variable length integer coding)</li>
+ * <li>number of unshared key bytes (variable length integer coding)</li>
+ * </ul>
+ */
+ @AutoValue
+ public abstract static class KeyPrefix {
+ public abstract int getSharedKeySize();
+ public abstract int getUnsharedKeySize();
+
+ public static KeyPrefix of(int sharedKeySize, int unsharedKeySize) {
+ return new AutoValue_IsmFormat_KeyPrefix(sharedKeySize, unsharedKeySize);
+ }
+ }
+
+ /** A {@link Coder} for {@link KeyPrefix}. */
+ public static final class KeyPrefixCoder extends AtomicCoder<KeyPrefix> {
+ private static final KeyPrefixCoder INSTANCE = new KeyPrefixCoder();
+
+ @JsonCreator
+ public static KeyPrefixCoder of() {
+ return INSTANCE;
+ }
+
+ @Override
+ public void encode(KeyPrefix value, OutputStream outStream, Coder.Context context)
+ throws CoderException, IOException {
+ VarInt.encode(value.getSharedKeySize(), outStream);
+ VarInt.encode(value.getUnsharedKeySize(), outStream);
+ }
+
+ @Override
+ public KeyPrefix decode(InputStream inStream, Coder.Context context)
+ throws CoderException, IOException {
+ return KeyPrefix.of(VarInt.decodeInt(inStream), VarInt.decodeInt(inStream));
+ }
+
+ @Override
+ public boolean consistentWithEquals() {
+ return true;
+ }
+
+ @Override
+ public boolean isRegisterByteSizeObserverCheap(KeyPrefix value, Coder.Context context) {
+ return true;
+ }
+
+ @Override
+ public long getEncodedElementByteSize(KeyPrefix value, Coder.Context context)
+ throws Exception {
+ Preconditions.checkNotNull(value);
+ return VarInt.getLength(value.getSharedKeySize())
+ + VarInt.getLength(value.getUnsharedKeySize());
+ }
+ }
+
+ /**
+ * The footer stores the relevant information required to locate the index and bloom filter.
+ * It also stores a version byte and the number of keys stored.
+ *
+ * <p>The footer is encoded as the value containing:
+ * <ul>
+ * <li>start of bloom filter offset (big endian long coding)</li>
+ * <li>start of shard index position offset (big endian long coding)</li>
+ * <li>number of keys in file (big endian long coding)</li>
+ * <li>0x01 (version key as a single byte)</li>
+ * </ul>
+ */
+ @AutoValue
+ public abstract static class Footer {
+ public static final int LONG_BYTES = 8;
+ public static final int FIXED_LENGTH = 3 * LONG_BYTES + 1;
+ public static final byte VERSION = 2;
+
+ public abstract byte getVersion();
+ public abstract long getIndexPosition();
+ public abstract long getBloomFilterPosition();
+ public abstract long getNumberOfKeys();
+
+ public static Footer of(long indexPosition, long bloomFilterPosition, long numberOfKeys) {
+ return new AutoValue_IsmFormat_Footer(
+ VERSION, indexPosition, bloomFilterPosition, numberOfKeys);
+ }
+ }
+
+ /** A {@link Coder} for {@link Footer}. */
+ public static final class FooterCoder extends AtomicCoder<Footer> {
+ private static final FooterCoder INSTANCE = new FooterCoder();
+
+ @JsonCreator
+ public static FooterCoder of() {
+ return INSTANCE;
+ }
+
+ @Override
+ public void encode(Footer value, OutputStream outStream, Coder.Context context)
+ throws CoderException, IOException {
+ DataOutputStream dataOut = new DataOutputStream(outStream);
+ dataOut.writeLong(value.getIndexPosition());
+ dataOut.writeLong(value.getBloomFilterPosition());
+ dataOut.writeLong(value.getNumberOfKeys());
+ dataOut.write(Footer.VERSION);
+ }
+
+ @Override
+ public Footer decode(InputStream inStream, Coder.Context context)
+ throws CoderException, IOException {
+ DataInputStream dataIn = new DataInputStream(inStream);
+ Footer footer = Footer.of(dataIn.readLong(), dataIn.readLong(), dataIn.readLong());
+ int version = dataIn.read();
+ if (version != Footer.VERSION) {
+ throw new IOException("Unknown version " + version + ". "
+ + "Only version 2 is currently supported.");
+ }
+ return footer;
+ }
+
+ @Override
+ public boolean consistentWithEquals() {
+ return true;
+ }
+
+ @Override
+ public boolean isRegisterByteSizeObserverCheap(Footer value, Coder.Context context) {
+ return true;
+ }
+
+ @Override
+ public long getEncodedElementByteSize(Footer value, Coder.Context context)
+ throws Exception {
+ return Footer.FIXED_LENGTH;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/27979d76/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/worker/package-info.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/worker/package-info.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/worker/package-info.java
new file mode 100644
index 0000000..6133148
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/worker/package-info.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Implementation of the harness that runs on each Google Compute Engine instance to coordinate
+ * execution of Pipeline code.
+ */
+@ParametersAreNonnullByDefault
+package org.apache.beam.sdk.runners.worker;
+
+import javax.annotation.ParametersAreNonnullByDefault;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/27979d76/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/util/DoFnInfo.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/util/DoFnInfo.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/util/DoFnInfo.java
new file mode 100644
index 0000000..ae19a17
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/util/DoFnInfo.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.values.PCollectionView;
+
+import java.io.Serializable;
+
+/**
+ * Wrapper class holding the necessary information to serialize a DoFn.
+ *
+ * @param <InputT> the type of the (main) input elements of the DoFn
+ * @param <OutputT> the type of the (main) output elements of the DoFn
+ */
+public class DoFnInfo<InputT, OutputT> implements Serializable {
+ private final DoFn<InputT, OutputT> doFn;
+ private final WindowingStrategy<?, ?> windowingStrategy;
+ private final Iterable<PCollectionView<?>> sideInputViews;
+ private final Coder<InputT> inputCoder;
+
+ public DoFnInfo(DoFn<InputT, OutputT> doFn, WindowingStrategy<?, ?> windowingStrategy) {
+ this.doFn = doFn;
+ this.windowingStrategy = windowingStrategy;
+ this.sideInputViews = null;
+ this.inputCoder = null;
+ }
+
+ public DoFnInfo(DoFn<InputT, OutputT> doFn, WindowingStrategy<?, ?> windowingStrategy,
+ Iterable<PCollectionView<?>> sideInputViews, Coder<InputT> inputCoder) {
+ this.doFn = doFn;
+ this.windowingStrategy = windowingStrategy;
+ this.sideInputViews = sideInputViews;
+ this.inputCoder = inputCoder;
+ }
+
+ public DoFn<InputT, OutputT> getDoFn() {
+ return doFn;
+ }
+
+ public WindowingStrategy<?, ?> getWindowingStrategy() {
+ return windowingStrategy;
+ }
+
+ public Iterable<PCollectionView<?>> getSideInputViews() {
+ return sideInputViews;
+ }
+
+ public Coder<InputT> getInputCoder() {
+ return inputCoder;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/27979d76/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/util/OutputReference.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/util/OutputReference.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/util/OutputReference.java
new file mode 100644
index 0000000..5e30172
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/util/OutputReference.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import static com.google.api.client.util.Preconditions.checkNotNull;
+
+import com.google.api.client.json.GenericJson;
+import com.google.api.client.util.Key;
+
+/**
+ * A representation used by {@link com.google.api.services.dataflow.model.Step}s
+ * to reference the output of other {@code Step}s.
+ */
+public final class OutputReference extends GenericJson {
+ @Key("@type")
+ public final String type = "OutputReference";
+
+ @Key("step_name")
+ private final String stepName;
+
+ @Key("output_name")
+ private final String outputName;
+
+ public OutputReference(String stepName, String outputName) {
+ this.stepName = checkNotNull(stepName);
+ this.outputName = checkNotNull(outputName);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/27979d76/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/util/RandomAccessData.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/util/RandomAccessData.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/util/RandomAccessData.java
new file mode 100644
index 0000000..2d902f4
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/util/RandomAccessData.java
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.io.ByteStreams;
+import com.google.common.primitives.UnsignedBytes;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.Comparator;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+/**
+ * An elastic-sized byte array which allows you to manipulate it as a stream, or access
+ * it directly. This allows for a quick succession of moving bytes from an {@link InputStream}
+ * to this wrapper to be used as an {@link OutputStream} and vice versa. This wrapper
+ * also provides random access to bytes stored within. This wrapper allows users to finely
+ * control the number of byte copies that occur.
+ *
+ * Anything stored within the in-memory buffer from offset {@link #size()} is considered temporary
+ * unused storage.
+ */
+@NotThreadSafe
+public class RandomAccessData {
+ /**
+ * A {@link Coder} which encodes the valid parts of this stream.
+ * This follows the same encoding scheme as {@link ByteArrayCoder}.
+ * This coder is deterministic and consistent with equals.
+ *
+ * This coder does not support encoding positive infinity.
+ */
+ public static class RandomAccessDataCoder extends AtomicCoder<RandomAccessData> {
+ private static final RandomAccessDataCoder INSTANCE = new RandomAccessDataCoder();
+
+ @JsonCreator
+ public static RandomAccessDataCoder of() {
+ return INSTANCE;
+ }
+
+ @Override
+ public void encode(RandomAccessData value, OutputStream outStream, Coder.Context context)
+ throws CoderException, IOException {
+ if (value == POSITIVE_INFINITY) {
+ throw new CoderException("Positive infinity can not be encoded.");
+ }
+ if (!context.isWholeStream) {
+ VarInt.encode(value.size, outStream);
+ }
+ value.writeTo(outStream, 0, value.size);
+ }
+
+ @Override
+ public RandomAccessData decode(InputStream inStream, Coder.Context context)
+ throws CoderException, IOException {
+ RandomAccessData rval = new RandomAccessData();
+ if (!context.isWholeStream) {
+ int length = VarInt.decodeInt(inStream);
+ rval.readFrom(inStream, 0, length);
+ } else {
+ ByteStreams.copy(inStream, rval.asOutputStream());
+ }
+ return rval;
+ }
+
+ @Override
+ public boolean consistentWithEquals() {
+ return true;
+ }
+
+ @Override
+ public boolean isRegisterByteSizeObserverCheap(
+ RandomAccessData value, Coder.Context context) {
+ return true;
+ }
+
+ @Override
+ protected long getEncodedElementByteSize(RandomAccessData value, Coder.Context context)
+ throws Exception {
+ if (value == null) {
+ throw new CoderException("cannot encode a null in memory stream");
+ }
+ long size = 0;
+ if (!context.isWholeStream) {
+ size += VarInt.getLength(value.size);
+ }
+ return size + value.size;
+ }
+ }
+
+ public static final UnsignedLexicographicalComparator UNSIGNED_LEXICOGRAPHICAL_COMPARATOR =
+ new UnsignedLexicographicalComparator();
+
+ /**
+ * A {@link Comparator} that compares two byte arrays lexicographically. It compares
+ * values as a list of unsigned bytes. The first pair of values that follow any common prefix,
+ * or when one array is a prefix of the other, treats the shorter array as the lesser.
+ * For example, [] < [0x01] < [0x01, 0x7F] < [0x01, 0x80] < [0x02] < POSITIVE INFINITY.
+ *
+ * <p>Note that a token type of positive infinity is supported and is greater than
+ * all other {@link RandomAccessData}.
+ */
+ public static final class UnsignedLexicographicalComparator
+ implements Comparator<RandomAccessData> {
+ // Do not instantiate
+ private UnsignedLexicographicalComparator() {
+ }
+
+ @Override
+ public int compare(RandomAccessData o1, RandomAccessData o2) {
+ return compare(o1, o2, 0 /* start from the beginning */);
+ }
+
+ /**
+ * Compare the two sets of bytes starting at the given offset.
+ */
+ public int compare(RandomAccessData o1, RandomAccessData o2, int startOffset) {
+ if (o1 == o2) {
+ return 0;
+ }
+ if (o1 == POSITIVE_INFINITY) {
+ return 1;
+ }
+ if (o2 == POSITIVE_INFINITY) {
+ return -1;
+ }
+
+ int minBytesLen = Math.min(o1.size, o2.size);
+ for (int i = startOffset; i < minBytesLen; i++) {
+ // unsigned comparison
+ int b1 = o1.buffer[i] & 0xFF;
+ int b2 = o2.buffer[i] & 0xFF;
+ if (b1 == b2) {
+ continue;
+ }
+ // Return the stream with the smaller byte as the smaller value.
+ return b1 - b2;
+ }
+ // If one is a prefix of the other, return the shorter one as the smaller one.
+ // If both lengths are equal, then both streams are equal.
+ return o1.size - o2.size;
+ }
+
+ /**
+ * Compute the length of the common prefix of the two provided sets of bytes.
+ */
+ public int commonPrefixLength(RandomAccessData o1, RandomAccessData o2) {
+ int minBytesLen = Math.min(o1.size, o2.size);
+ for (int i = 0; i < minBytesLen; i++) {
+ // unsigned comparison
+ int b1 = o1.buffer[i] & 0xFF;
+ int b2 = o2.buffer[i] & 0xFF;
+ if (b1 != b2) {
+ return i;
+ }
+ }
+ return minBytesLen;
+ }
+ }
+
+ /** A token type representing positive infinity. */
+ static final RandomAccessData POSITIVE_INFINITY = new RandomAccessData(0);
+
+ /**
+ * Returns a RandomAccessData that is the smallest value of same length which
+ * is strictly greater than this. Note that if this is empty or is all 0xFF then
+ * a token value of positive infinity is returned.
+ *
+ * The {@link UnsignedLexicographicalComparator} supports comparing {@link RandomAccessData}
+ * with support for positive infinitiy.
+ */
+ public RandomAccessData increment() throws IOException {
+ RandomAccessData copy = copy();
+ for (int i = copy.size - 1; i >= 0; --i) {
+ if (copy.buffer[i] != UnsignedBytes.MAX_VALUE) {
+ copy.buffer[i] = UnsignedBytes.checkedCast(UnsignedBytes.toInt(copy.buffer[i]) + 1);
+ return copy;
+ }
+ }
+ return POSITIVE_INFINITY;
+ }
+
+ private static final int DEFAULT_INITIAL_BUFFER_SIZE = 128;
+
+ /** Constructs a RandomAccessData with a default buffer size. */
+ public RandomAccessData() {
+ this(DEFAULT_INITIAL_BUFFER_SIZE);
+ }
+
+ /** Constructs a RandomAccessData with the initial buffer. */
+ public RandomAccessData(byte[] initialBuffer) {
+ checkNotNull(initialBuffer);
+ this.buffer = initialBuffer;
+ this.size = initialBuffer.length;
+ }
+
+ /** Constructs a RandomAccessData with the given buffer size. */
+ public RandomAccessData(int initialBufferSize) {
+ checkArgument(initialBufferSize >= 0, "Expected initial buffer size to be greater than zero.");
+ this.buffer = new byte[initialBufferSize];
+ }
+
+ private byte[] buffer;
+ private int size;
+
+ /** Returns the backing array. */
+ public byte[] array() {
+ return buffer;
+ }
+
+ /** Returns the number of bytes in the backing array that are valid. */
+ public int size() {
+ return size;
+ }
+
+ /** Resets the end of the stream to the specified position. */
+ public void resetTo(int position) {
+ ensureCapacity(position);
+ size = position;
+ }
+
+ private final OutputStream outputStream = new OutputStream() {
+ @Override
+ public void write(int b) throws IOException {
+ ensureCapacity(size + 1);
+ buffer[size] = (byte) b;
+ size += 1;
+ }
+
+ @Override
+ public void write(byte[] b, int offset, int length) throws IOException {
+ ensureCapacity(size + length);
+ System.arraycopy(b, offset, buffer, size, length);
+ size += length;
+ }
+ };
+
+ /**
+ * Returns an output stream which writes to the backing buffer from the current position.
+ * Note that the internal buffer will grow as required to accomodate all data written.
+ */
+ public OutputStream asOutputStream() {
+ return outputStream;
+ }
+
+ /**
+ * Returns an {@link InputStream} wrapper which supplies the portion of this backing byte buffer
+ * starting at {@code offset} and up to {@code length} bytes. Note that the returned
+ * {@link InputStream} is only a wrapper and any modifications to the underlying
+ * {@link RandomAccessData} will be visible by the {@link InputStream}.
+ */
+ public InputStream asInputStream(final int offset, final int length) {
+ return new ByteArrayInputStream(buffer, offset, length);
+ }
+
+ /**
+ * Writes {@code length} bytes starting at {@code offset} from the backing data store to the
+ * specified output stream.
+ */
+ public void writeTo(OutputStream out, int offset, int length) throws IOException {
+ out.write(buffer, offset, length);
+ }
+
+ /**
+ * Reads {@code length} bytes from the specified input stream writing them into the backing
+ * data store starting at {@code offset}.
+ *
+ * <p>Note that the in memory stream will be grown to ensure there is enough capacity.
+ */
+ public void readFrom(InputStream inStream, int offset, int length) throws IOException {
+ ensureCapacity(offset + length);
+ ByteStreams.readFully(inStream, buffer, offset, length);
+ size = offset + length;
+ }
+
+ /** Returns a copy of this RandomAccessData. */
+ public RandomAccessData copy() throws IOException {
+ RandomAccessData copy = new RandomAccessData(size);
+ writeTo(copy.asOutputStream(), 0, size);
+ return copy;
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (other == this) {
+ return true;
+ }
+ if (!(other instanceof RandomAccessData)) {
+ return false;
+ }
+ return UNSIGNED_LEXICOGRAPHICAL_COMPARATOR.compare(this, (RandomAccessData) other) == 0;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = 1;
+ for (int i = 0; i < size; ++i) {
+ result = 31 * result + buffer[i];
+ }
+
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("buffer", Arrays.copyOf(buffer, size))
+ .add("size", size)
+ .toString();
+ }
+
+ private void ensureCapacity(int minCapacity) {
+ // If we have enough space, don't grow the buffer.
+ if (minCapacity <= buffer.length) {
+ return;
+ }
+
+ // Try to double the size of the buffer, if thats not enough, just use the new capacity.
+ // Note that we use Math.min(long, long) to not cause overflow on the multiplication.
+ int newCapacity = (int) Math.min(Integer.MAX_VALUE, buffer.length * 2L);
+ if (newCapacity < minCapacity) {
+ newCapacity = minCapacity;
+ }
+ buffer = Arrays.copyOf(buffer, newCapacity);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/27979d76/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/util/TimeUtil.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/util/TimeUtil.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/util/TimeUtil.java
new file mode 100644
index 0000000..db5c760
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/util/TimeUtil.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import org.joda.time.DateTime;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.joda.time.ReadableDuration;
+import org.joda.time.ReadableInstant;
+import org.joda.time.chrono.ISOChronology;
+
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import javax.annotation.Nullable;
+
+/**
+ * A helper class for converting between Dataflow API and SDK time
+ * representations.
+ *
+ * <p>Dataflow API times are strings of the form
+ * {@code YYYY-MM-dd'T'HH:mm:ss[.nnnn]'Z'}: that is, RFC 3339
+ * strings with optional fractional seconds and a 'Z' offset.
+ *
+ * <p>Dataflow API durations are strings of the form {@code ['-']sssss[.nnnn]'s'}:
+ * that is, seconds with optional fractional seconds and a literal 's' at the end.
+ *
+ * <p>In both formats, fractional seconds are either three digits (millisecond
+ * resolution), six digits (microsecond resolution), or nine digits (nanosecond
+ * resolution).
+ */
+public final class TimeUtil {
+ private TimeUtil() {} // Non-instantiable.
+
+ private static final Pattern DURATION_PATTERN = Pattern.compile("(\\d+)(?:\\.(\\d+))?s");
+ private static final Pattern TIME_PATTERN =
+ Pattern.compile("(\\d{4})-(\\d{2})-(\\d{2})T(\\d{2}):(\\d{2}):(\\d{2})(?:\\.(\\d+))?Z");
+
+ /**
+ * Converts a {@link ReadableInstant} into a Dateflow API time value.
+ */
+ public static String toCloudTime(ReadableInstant instant) {
+ // Note that since Joda objects use millisecond resolution, we always
+ // produce either no fractional seconds or fractional seconds with
+ // millisecond resolution.
+
+ // Translate the ReadableInstant to a DateTime with ISOChronology.
+ DateTime time = new DateTime(instant);
+
+ int millis = time.getMillisOfSecond();
+ if (millis == 0) {
+ return String.format("%04d-%02d-%02dT%02d:%02d:%02dZ",
+ time.getYear(),
+ time.getMonthOfYear(),
+ time.getDayOfMonth(),
+ time.getHourOfDay(),
+ time.getMinuteOfHour(),
+ time.getSecondOfMinute());
+ } else {
+ return String.format("%04d-%02d-%02dT%02d:%02d:%02d.%03dZ",
+ time.getYear(),
+ time.getMonthOfYear(),
+ time.getDayOfMonth(),
+ time.getHourOfDay(),
+ time.getMinuteOfHour(),
+ time.getSecondOfMinute(),
+ millis);
+ }
+ }
+
+ /**
+ * Converts a time value received via the Dataflow API into the corresponding
+ * {@link Instant}.
+ * @return the parsed time, or null if a parse error occurs
+ */
+ @Nullable
+ public static Instant fromCloudTime(String time) {
+ Matcher matcher = TIME_PATTERN.matcher(time);
+ if (!matcher.matches()) {
+ return null;
+ }
+ int year = Integer.valueOf(matcher.group(1));
+ int month = Integer.valueOf(matcher.group(2));
+ int day = Integer.valueOf(matcher.group(3));
+ int hour = Integer.valueOf(matcher.group(4));
+ int minute = Integer.valueOf(matcher.group(5));
+ int second = Integer.valueOf(matcher.group(6));
+ int millis = 0;
+
+ String frac = matcher.group(7);
+ if (frac != null) {
+ int fracs = Integer.valueOf(frac);
+ if (frac.length() == 3) { // millisecond resolution
+ millis = fracs;
+ } else if (frac.length() == 6) { // microsecond resolution
+ millis = fracs / 1000;
+ } else if (frac.length() == 9) { // nanosecond resolution
+ millis = fracs / 1000000;
+ } else {
+ return null;
+ }
+ }
+
+ return new DateTime(year, month, day, hour, minute, second, millis,
+ ISOChronology.getInstanceUTC()).toInstant();
+ }
+
+ /**
+ * Converts a {@link ReadableDuration} into a Dataflow API duration string.
+ */
+ public static String toCloudDuration(ReadableDuration duration) {
+ // Note that since Joda objects use millisecond resolution, we always
+ // produce either no fractional seconds or fractional seconds with
+ // millisecond resolution.
+ long millis = duration.getMillis();
+ long seconds = millis / 1000;
+ millis = millis % 1000;
+ if (millis == 0) {
+ return String.format("%ds", seconds);
+ } else {
+ return String.format("%d.%03ds", seconds, millis);
+ }
+ }
+
+ /**
+ * Converts a Dataflow API duration string into a {@link Duration}.
+ * @return the parsed duration, or null if a parse error occurs
+ */
+ @Nullable
+ public static Duration fromCloudDuration(String duration) {
+ Matcher matcher = DURATION_PATTERN.matcher(duration);
+ if (!matcher.matches()) {
+ return null;
+ }
+ long millis = Long.valueOf(matcher.group(1)) * 1000;
+ String frac = matcher.group(2);
+ if (frac != null) {
+ long fracs = Long.valueOf(frac);
+ if (frac.length() == 3) { // millisecond resolution
+ millis += fracs;
+ } else if (frac.length() == 6) { // microsecond resolution
+ millis += fracs / 1000;
+ } else if (frac.length() == 9) { // nanosecond resolution
+ millis += fracs / 1000000;
+ } else {
+ return null;
+ }
+ }
+ return Duration.millis(millis);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/27979d76/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/util/RandomAccessDataTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/util/RandomAccessDataTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/util/RandomAccessDataTest.java
new file mode 100644
index 0000000..b990212
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/util/RandomAccessDataTest.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.beam.sdk.coders.Coder.Context;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.testing.CoderProperties;
+import org.apache.beam.sdk.util.RandomAccessData.RandomAccessDataCoder;
+
+import com.google.common.primitives.UnsignedBytes;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.InputStream;
+import java.util.Arrays;
+
+/**
+ * Tests for {@link RandomAccessData}.
+ */
+@RunWith(JUnit4.class)
+public class RandomAccessDataTest {
+ private static final byte[] TEST_DATA_A = new byte[]{ 0x01, 0x02, 0x03 };
+ private static final byte[] TEST_DATA_B = new byte[]{ 0x06, 0x05, 0x04, 0x03 };
+ private static final byte[] TEST_DATA_C = new byte[]{ 0x06, 0x05, 0x03, 0x03 };
+
+ @Rule public ExpectedException expectedException = ExpectedException.none();
+
+ @Test
+ public void testCoder() throws Exception {
+ RandomAccessData streamA = new RandomAccessData();
+ streamA.asOutputStream().write(TEST_DATA_A);
+ RandomAccessData streamB = new RandomAccessData();
+ streamB.asOutputStream().write(TEST_DATA_A);
+ CoderProperties.coderDecodeEncodeEqual(RandomAccessDataCoder.of(), streamA);
+ CoderProperties.coderDeterministic(RandomAccessDataCoder.of(), streamA, streamB);
+ CoderProperties.coderConsistentWithEquals(RandomAccessDataCoder.of(), streamA, streamB);
+ CoderProperties.coderSerializable(RandomAccessDataCoder.of());
+ CoderProperties.structuralValueConsistentWithEquals(
+ RandomAccessDataCoder.of(), streamA, streamB);
+ assertTrue(RandomAccessDataCoder.of().isRegisterByteSizeObserverCheap(streamA, Context.NESTED));
+ assertTrue(RandomAccessDataCoder.of().isRegisterByteSizeObserverCheap(streamA, Context.OUTER));
+ assertEquals(4, RandomAccessDataCoder.of().getEncodedElementByteSize(streamA, Context.NESTED));
+ assertEquals(3, RandomAccessDataCoder.of().getEncodedElementByteSize(streamA, Context.OUTER));
+ }
+
+ @Test
+ public void testCoderWithPositiveInfinityIsError() throws Exception {
+ expectedException.expect(CoderException.class);
+ expectedException.expectMessage("Positive infinity can not be encoded");
+ RandomAccessDataCoder.of().encode(
+ RandomAccessData.POSITIVE_INFINITY, new ByteArrayOutputStream(), Context.OUTER);
+ }
+
+ @Test
+ public void testLexicographicalComparator() throws Exception {
+ RandomAccessData streamA = new RandomAccessData();
+ streamA.asOutputStream().write(TEST_DATA_A);
+ RandomAccessData streamB = new RandomAccessData();
+ streamB.asOutputStream().write(TEST_DATA_B);
+ RandomAccessData streamC = new RandomAccessData();
+ streamC.asOutputStream().write(TEST_DATA_C);
+ assertTrue(RandomAccessData.UNSIGNED_LEXICOGRAPHICAL_COMPARATOR.compare(
+ streamA, streamB) < 0);
+ assertTrue(RandomAccessData.UNSIGNED_LEXICOGRAPHICAL_COMPARATOR.compare(
+ streamB, streamA) > 0);
+ assertTrue(RandomAccessData.UNSIGNED_LEXICOGRAPHICAL_COMPARATOR.compare(
+ streamB, streamB) == 0);
+ // Check common prefix length.
+ assertEquals(2, RandomAccessData.UNSIGNED_LEXICOGRAPHICAL_COMPARATOR.commonPrefixLength(
+ streamB, streamC));
+ // Check that we honor the start offset.
+ assertTrue(RandomAccessData.UNSIGNED_LEXICOGRAPHICAL_COMPARATOR.compare(
+ streamB, streamC, 3) == 0);
+ // Test positive infinity comparisons.
+ assertTrue(RandomAccessData.UNSIGNED_LEXICOGRAPHICAL_COMPARATOR.compare(
+ streamA, RandomAccessData.POSITIVE_INFINITY) < 0);
+ assertTrue(RandomAccessData.UNSIGNED_LEXICOGRAPHICAL_COMPARATOR.compare(
+ RandomAccessData.POSITIVE_INFINITY, RandomAccessData.POSITIVE_INFINITY) == 0);
+ assertTrue(RandomAccessData.UNSIGNED_LEXICOGRAPHICAL_COMPARATOR.compare(
+ RandomAccessData.POSITIVE_INFINITY, streamA) > 0);
+ }
+
+ @Test
+ public void testEqualsAndHashCode() throws Exception {
+ // Test that equality by reference works
+ RandomAccessData streamA = new RandomAccessData();
+ streamA.asOutputStream().write(TEST_DATA_A);
+ assertEquals(streamA, streamA);
+ assertEquals(streamA.hashCode(), streamA.hashCode());
+
+ // Test different objects containing the same data are the same
+ RandomAccessData streamACopy = new RandomAccessData();
+ streamACopy.asOutputStream().write(TEST_DATA_A);
+ assertEquals(streamA, streamACopy);
+ assertEquals(streamA.hashCode(), streamACopy.hashCode());
+
+ // Test same length streams with different data differ
+ RandomAccessData streamB = new RandomAccessData();
+ streamB.asOutputStream().write(new byte[]{ 0x01, 0x02, 0x04 });
+ assertNotEquals(streamA, streamB);
+ assertNotEquals(streamA.hashCode(), streamB.hashCode());
+
+ // Test different length streams differ
+ streamB.asOutputStream().write(TEST_DATA_B);
+ assertNotEquals(streamA, streamB);
+ assertNotEquals(streamA.hashCode(), streamB.hashCode());
+ }
+
+ @Test
+ public void testResetTo() throws Exception {
+ RandomAccessData stream = new RandomAccessData();
+ stream.asOutputStream().write(TEST_DATA_A);
+ stream.resetTo(1);
+ assertEquals(1, stream.size());
+ stream.asOutputStream().write(TEST_DATA_A);
+ assertArrayEquals(new byte[]{ 0x01, 0x01, 0x02, 0x03 },
+ Arrays.copyOf(stream.array(), stream.size()));
+ }
+
+ @Test
+ public void testAsInputStream() throws Exception {
+ RandomAccessData stream = new RandomAccessData();
+ stream.asOutputStream().write(TEST_DATA_A);
+ InputStream in = stream.asInputStream(1, 1);
+ assertEquals(0x02, in.read());
+ assertEquals(-1, in.read());
+ in.close();
+ }
+
+ @Test
+ public void testReadFrom() throws Exception {
+ ByteArrayInputStream bais = new ByteArrayInputStream(TEST_DATA_A);
+ RandomAccessData stream = new RandomAccessData();
+ stream.readFrom(bais, 3, 2);
+ assertArrayEquals(new byte[]{ 0x00, 0x00, 0x00, 0x01, 0x02 },
+ Arrays.copyOf(stream.array(), stream.size()));
+ bais.close();
+ }
+
+ @Test
+ public void testWriteTo() throws Exception {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ RandomAccessData stream = new RandomAccessData();
+ stream.asOutputStream().write(TEST_DATA_B);
+ stream.writeTo(baos, 1, 2);
+ assertArrayEquals(new byte[]{ 0x05, 0x04 }, baos.toByteArray());
+ baos.close();
+ }
+
+ @Test
+ public void testThatRandomAccessDataGrowsWhenResettingToPositionBeyondEnd() throws Exception {
+ RandomAccessData stream = new RandomAccessData(0);
+ assertArrayEquals(new byte[0], stream.array());
+ stream.resetTo(3); // force resize
+ assertArrayEquals(new byte[]{ 0x00, 0x00, 0x00 }, stream.array());
+ }
+
+ @Test
+ public void testThatRandomAccessDataGrowsWhenReading() throws Exception {
+ RandomAccessData stream = new RandomAccessData(0);
+ assertArrayEquals(new byte[0], stream.array());
+ stream.readFrom(new ByteArrayInputStream(TEST_DATA_A), 0, TEST_DATA_A.length);
+ assertArrayEquals(TEST_DATA_A,
+ Arrays.copyOf(stream.array(), TEST_DATA_A.length));
+ }
+
+ @Test
+ public void testIncrement() throws Exception {
+ assertEquals(new RandomAccessData(new byte[]{ 0x00, 0x01 }),
+ new RandomAccessData(new byte[]{ 0x00, 0x00 }).increment());
+ assertEquals(new RandomAccessData(new byte[]{ 0x01, UnsignedBytes.MAX_VALUE }),
+ new RandomAccessData(new byte[]{ 0x00, UnsignedBytes.MAX_VALUE }).increment());
+
+ // Test for positive infinity
+ assertSame(RandomAccessData.POSITIVE_INFINITY, new RandomAccessData(new byte[0]).increment());
+ assertSame(RandomAccessData.POSITIVE_INFINITY,
+ new RandomAccessData(new byte[]{ UnsignedBytes.MAX_VALUE }).increment());
+ assertSame(RandomAccessData.POSITIVE_INFINITY, RandomAccessData.POSITIVE_INFINITY.increment());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/27979d76/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/util/TimeUtilTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/util/TimeUtilTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/util/TimeUtilTest.java
new file mode 100644
index 0000000..b318dee
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/util/TimeUtilTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import static org.apache.beam.sdk.util.TimeUtil.fromCloudDuration;
+import static org.apache.beam.sdk.util.TimeUtil.fromCloudTime;
+import static org.apache.beam.sdk.util.TimeUtil.toCloudDuration;
+import static org.apache.beam.sdk.util.TimeUtil.toCloudTime;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Unit tests for {@link TimeUtil}. */
+@RunWith(JUnit4.class)
+public final class TimeUtilTest {
+ @Test
+ public void toCloudTimeShouldPrintTimeStrings() {
+ assertEquals("1970-01-01T00:00:00Z", toCloudTime(new Instant(0)));
+ assertEquals("1970-01-01T00:00:00.001Z", toCloudTime(new Instant(1)));
+ }
+
+ @Test
+ public void fromCloudTimeShouldParseTimeStrings() {
+ assertEquals(new Instant(0), fromCloudTime("1970-01-01T00:00:00Z"));
+ assertEquals(new Instant(1), fromCloudTime("1970-01-01T00:00:00.001Z"));
+ assertEquals(new Instant(1), fromCloudTime("1970-01-01T00:00:00.001000Z"));
+ assertEquals(new Instant(1), fromCloudTime("1970-01-01T00:00:00.001001Z"));
+ assertEquals(new Instant(1), fromCloudTime("1970-01-01T00:00:00.001000000Z"));
+ assertEquals(new Instant(1), fromCloudTime("1970-01-01T00:00:00.001000001Z"));
+ assertNull(fromCloudTime(""));
+ assertNull(fromCloudTime("1970-01-01T00:00:00"));
+ }
+
+ @Test
+ public void toCloudDurationShouldPrintDurationStrings() {
+ assertEquals("0s", toCloudDuration(Duration.ZERO));
+ assertEquals("4s", toCloudDuration(Duration.millis(4000)));
+ assertEquals("4.001s", toCloudDuration(Duration.millis(4001)));
+ }
+
+ @Test
+ public void fromCloudDurationShouldParseDurationStrings() {
+ assertEquals(Duration.millis(4000), fromCloudDuration("4s"));
+ assertEquals(Duration.millis(4001), fromCloudDuration("4.001s"));
+ assertEquals(Duration.millis(4001), fromCloudDuration("4.001000s"));
+ assertEquals(Duration.millis(4001), fromCloudDuration("4.001001s"));
+ assertEquals(Duration.millis(4001), fromCloudDuration("4.001000000s"));
+ assertEquals(Duration.millis(4001), fromCloudDuration("4.001000001s"));
+ assertNull(fromCloudDuration(""));
+ assertNull(fromCloudDuration("4"));
+ assertNull(fromCloudDuration("4.1"));
+ assertNull(fromCloudDuration("4.1s"));
+ }
+}
[2/7] incubator-beam git commit: [BEAM-151] Rebase onto apache/master
updating packages to new structure
Posted by lc...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6023d26a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/util/TimeUtilTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/util/TimeUtilTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/util/TimeUtilTest.java
deleted file mode 100644
index b318dee..0000000
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/util/TimeUtilTest.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import static org.apache.beam.sdk.util.TimeUtil.fromCloudDuration;
-import static org.apache.beam.sdk.util.TimeUtil.fromCloudTime;
-import static org.apache.beam.sdk.util.TimeUtil.toCloudDuration;
-import static org.apache.beam.sdk.util.TimeUtil.toCloudTime;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/** Unit tests for {@link TimeUtil}. */
-@RunWith(JUnit4.class)
-public final class TimeUtilTest {
- @Test
- public void toCloudTimeShouldPrintTimeStrings() {
- assertEquals("1970-01-01T00:00:00Z", toCloudTime(new Instant(0)));
- assertEquals("1970-01-01T00:00:00.001Z", toCloudTime(new Instant(1)));
- }
-
- @Test
- public void fromCloudTimeShouldParseTimeStrings() {
- assertEquals(new Instant(0), fromCloudTime("1970-01-01T00:00:00Z"));
- assertEquals(new Instant(1), fromCloudTime("1970-01-01T00:00:00.001Z"));
- assertEquals(new Instant(1), fromCloudTime("1970-01-01T00:00:00.001000Z"));
- assertEquals(new Instant(1), fromCloudTime("1970-01-01T00:00:00.001001Z"));
- assertEquals(new Instant(1), fromCloudTime("1970-01-01T00:00:00.001000000Z"));
- assertEquals(new Instant(1), fromCloudTime("1970-01-01T00:00:00.001000001Z"));
- assertNull(fromCloudTime(""));
- assertNull(fromCloudTime("1970-01-01T00:00:00"));
- }
-
- @Test
- public void toCloudDurationShouldPrintDurationStrings() {
- assertEquals("0s", toCloudDuration(Duration.ZERO));
- assertEquals("4s", toCloudDuration(Duration.millis(4000)));
- assertEquals("4.001s", toCloudDuration(Duration.millis(4001)));
- }
-
- @Test
- public void fromCloudDurationShouldParseDurationStrings() {
- assertEquals(Duration.millis(4000), fromCloudDuration("4s"));
- assertEquals(Duration.millis(4001), fromCloudDuration("4.001s"));
- assertEquals(Duration.millis(4001), fromCloudDuration("4.001000s"));
- assertEquals(Duration.millis(4001), fromCloudDuration("4.001001s"));
- assertEquals(Duration.millis(4001), fromCloudDuration("4.001000000s"));
- assertEquals(Duration.millis(4001), fromCloudDuration("4.001000001s"));
- assertNull(fromCloudDuration(""));
- assertNull(fromCloudDuration("4"));
- assertNull(fromCloudDuration("4.1"));
- assertNull(fromCloudDuration("4.1s"));
- }
-}
[4/7] incubator-beam git commit: [BEAM-151] Rebase onto apache/master
updating packages to new structure
Posted by lc...@apache.org.
[BEAM-151] Rebase onto apache/master updating packages to new structure
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/6023d26a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/6023d26a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/6023d26a
Branch: refs/heads/master
Commit: 6023d26afede897160d1ad59f4989d857fe77b00
Parents: 27979d7
Author: Luke Cwik <lc...@google.com>
Authored: Wed Apr 27 10:35:33 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Wed Apr 27 17:26:30 2016 -0700
----------------------------------------------------------------------
.../runners/dataflow/DataflowPipelineJob.java | 2 +-
.../dataflow/DataflowPipelineRunner.java | 8 +-
.../dataflow/DataflowPipelineTranslator.java | 4 +-
.../runners/dataflow/internal/IsmFormat.java | 811 +++++++++++++++++++
.../beam/runners/dataflow/util/DoFnInfo.java | 69 ++
.../runners/dataflow/util/MonitoringUtil.java | 3 +-
.../runners/dataflow/util/OutputReference.java | 43 +
.../runners/dataflow/util/RandomAccessData.java | 356 ++++++++
.../beam/runners/dataflow/util/TimeUtil.java | 166 ++++
.../beam/sdk/runners/worker/IsmFormat.java | 811 -------------------
.../beam/sdk/runners/worker/package-info.java | 25 -
.../java/org/apache/beam/sdk/util/DoFnInfo.java | 68 --
.../apache/beam/sdk/util/OutputReference.java | 43 -
.../apache/beam/sdk/util/RandomAccessData.java | 355 --------
.../java/org/apache/beam/sdk/util/TimeUtil.java | 166 ----
.../dataflow/DataflowPipelineRunnerTest.java | 8 +-
.../DataflowPipelineTranslatorTest.java | 2 +-
.../testing/TestDataflowPipelineRunnerTest.java | 2 +-
.../dataflow/util/MonitoringUtilTest.java | 1 -
.../dataflow/util/RandomAccessDataTest.java | 207 +++++
.../runners/dataflow/util/TimeUtilTest.java | 75 ++
.../beam/sdk/util/RandomAccessDataTest.java | 207 -----
.../org/apache/beam/sdk/util/TimeUtilTest.java | 75 --
23 files changed, 1741 insertions(+), 1766 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6023d26a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
index 19df0a1..69565ac 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
@@ -17,7 +17,7 @@
*/
package org.apache.beam.runners.dataflow;
-import static org.apache.beam.sdk.util.TimeUtil.fromCloudTime;
+import static org.apache.beam.runners.dataflow.util.TimeUtil.fromCloudTime;
import org.apache.beam.runners.dataflow.internal.DataflowAggregatorTransforms;
import org.apache.beam.runners.dataflow.internal.DataflowMetricUpdateExtractor;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6023d26a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineRunner.java
index ec4a60c..2f01101 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineRunner.java
@@ -29,6 +29,10 @@ import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.TransformTran
import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.TranslationContext;
import org.apache.beam.runners.dataflow.internal.AssignWindows;
import org.apache.beam.runners.dataflow.internal.DataflowAggregatorTransforms;
+import org.apache.beam.runners.dataflow.internal.IsmFormat;
+import org.apache.beam.runners.dataflow.internal.IsmFormat.IsmRecord;
+import org.apache.beam.runners.dataflow.internal.IsmFormat.IsmRecordCoder;
+import org.apache.beam.runners.dataflow.internal.IsmFormat.MetadataKeyCoder;
import org.apache.beam.runners.dataflow.internal.PubsubIOTranslator;
import org.apache.beam.runners.dataflow.internal.ReadTranslator;
import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions;
@@ -70,10 +74,6 @@ import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.runners.AggregatorPipelineExtractor;
import org.apache.beam.sdk.runners.PipelineRunner;
import org.apache.beam.sdk.runners.TransformTreeNode;
-import org.apache.beam.sdk.runners.worker.IsmFormat;
-import org.apache.beam.sdk.runners.worker.IsmFormat.IsmRecord;
-import org.apache.beam.sdk.runners.worker.IsmFormat.IsmRecordCoder;
-import org.apache.beam.sdk.runners.worker.IsmFormat.MetadataKeyCoder;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6023d26a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
index 0f2d325..4ef1bdb 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
@@ -35,6 +35,8 @@ import org.apache.beam.runners.dataflow.internal.BigQueryIOTranslator;
import org.apache.beam.runners.dataflow.internal.PubsubIOTranslator;
import org.apache.beam.runners.dataflow.internal.ReadTranslator;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
+import org.apache.beam.runners.dataflow.util.DoFnInfo;
+import org.apache.beam.runners.dataflow.util.OutputReference;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.Pipeline.PipelineVisitor;
import org.apache.beam.sdk.coders.Coder;
@@ -57,8 +59,6 @@ import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.util.AppliedCombineFn;
import org.apache.beam.sdk.util.CloudObject;
-import org.apache.beam.sdk.util.DoFnInfo;
-import org.apache.beam.sdk.util.OutputReference;
import org.apache.beam.sdk.util.PropertyNames;
import org.apache.beam.sdk.util.SerializableUtils;
import org.apache.beam.sdk.util.WindowedValue;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6023d26a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java
new file mode 100644
index 0000000..1969cfb
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java
@@ -0,0 +1,811 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.internal;
+
+import static org.apache.beam.sdk.util.Structs.addLong;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+import org.apache.beam.runners.dataflow.util.RandomAccessData;
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.Coder.NonDeterministicException;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.coders.StandardCoder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.util.CloudObject;
+import org.apache.beam.sdk.util.PropertyNames;
+import org.apache.beam.sdk.util.VarInt;
+import org.apache.beam.sdk.values.PCollection;
+
+import com.google.auto.value.AutoValue;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hashing;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import javax.annotation.Nullable;
+
+/**
+ * An Ism file is a prefix encoded composite key value file broken into shards. Each composite
+ * key is composed of a fixed number of component keys. A fixed number of those sub keys represent
+ * the shard key portion; see {@link IsmRecord} and {@link IsmRecordCoder} for further details
+ * around the data format. In addition to the data, there is a bloom filter,
+ * and multiple indices to allow for efficient retrieval.
+ *
+ * <p>An Ism file is composed of these high level sections (in order):
+ * <ul>
+ * <li>shard block</li>
+ * <li>bloom filter (See {@code ScalableBloomFilter} for details on encoding format)</li>
+ * <li>shard index</li>
+ * <li>footer (See {@link Footer} for details on encoding format)</li>
+ * </ul>
+ *
+ * <p>The shard block is composed of multiple copies of the following:
+ * <ul>
+ * <li>data block</li>
+ * <li>data index</li>
+ * </ul>
+ *
+ * <p>The data block is composed of multiple copies of the following:
+ * <ul>
+ * <li>key prefix (See {@link KeyPrefix} for details on encoding format)</li>
+ * <li>unshared key bytes</li>
+ * <li>value bytes</li>
+ * <li>optional 0x00 0x00 bytes followed by metadata bytes
+ * (if the following 0x00 0x00 bytes are not present, then there are no metadata bytes)</li>
+ * </ul>
+ * Each key written into the data block must be in unsigned lexicographically increasing order
+ * and also its shard portion of the key must hash to the same shard id as all other keys
+ * within the same data block. The hashing function used is the
+ * <a href="http://smhasher.googlecode.com/svn/trunk/MurmurHash3.cpp">
+ * 32-bit murmur3 algorithm, x86 variant</a> (little-endian variant),
+ * using {@code 1225801234} as the seed value.
+ *
+ * <p>The data index is composed of {@code N} copies of the following:
+ * <ul>
+ * <li>key prefix (See {@link KeyPrefix} for details on encoding format)</li>
+ * <li>unshared key bytes</li>
+ * <li>byte offset to key prefix in data block (variable length long coding)</li>
+ * </ul>
+ *
+ * <p>The shard index is composed of a {@link VarInt variable length integer} encoding representing
+ * the number of shard index records followed by that many shard index records.
+ * See {@link IsmShardCoder} for further details as to its encoding scheme.
+ */
+public class IsmFormat {
+ private static final int HASH_SEED = 1225801234;
+ private static final HashFunction HASH_FUNCTION = Hashing.murmur3_32(HASH_SEED);
+ public static final int SHARD_BITS = 0x7F; // [0-127] shards + [128-255] metadata shards
+
+ /**
+ * A record containing a composite key and either a value or metadata. The composite key
+ * must not contain the metadata key component place holder if producing a value record, and must
+ * contain the metadata component key place holder if producing a metadata record.
+ *
+ * <p>The composite key is a fixed number of component keys where the first {@code N} component
+ * keys are used to create a shard id via hashing. See {@link IsmRecordCoder#hash(List)} for
+ * further details.
+ */
+ @AutoValue
+ public abstract static class IsmRecord<V> {
+ abstract List<?> keyComponents();
+ @Nullable abstract V value();
+ @Nullable abstract byte[] metadata();
+
+ IsmRecord() {} // Prevent public constructor
+
+ /** Returns an IsmRecord with the specified key components and value. */
+ public static <V> IsmRecord<V> of(List<?> keyComponents, V value) {
+ checkArgument(!keyComponents.isEmpty(), "Expected non-empty list of key components.");
+ checkArgument(!isMetadataKey(keyComponents),
+ "Expected key components to not contain metadata key.");
+ return new AutoValue_IsmFormat_IsmRecord<V>(keyComponents, value, null);
+ }
+
+ public static <V> IsmRecord<V> meta(List<?> keyComponents, byte[] metadata) {
+ checkNotNull(metadata);
+ checkArgument(!keyComponents.isEmpty(), "Expected non-empty list of key components.");
+ checkArgument(isMetadataKey(keyComponents),
+ "Expected key components to contain metadata key.");
+ return new AutoValue_IsmFormat_IsmRecord<V>(keyComponents, null, metadata);
+ }
+
+ /** Returns the list of key components. */
+ public List<?> getKeyComponents() {
+ return keyComponents();
+ }
+
+ /** Returns the key component at the specified index. */
+ public Object getKeyComponent(int index) {
+ return keyComponents().get(index);
+ }
+
+ /**
+ * Returns the value. Throws {@link IllegalStateException} if this is not a
+ * value record.
+ */
+ public V getValue() {
+ checkState(!isMetadataKey(keyComponents()),
+ "This is a metadata record and not a value record.");
+ return value();
+ }
+
+ /**
+ * Returns the metadata. Throws {@link IllegalStateException} if this is not a
+ * metadata record.
+ */
+ public byte[] getMetadata() {
+ checkState(isMetadataKey(keyComponents()),
+ "This is a value record and not a metadata record.");
+ return metadata();
+ }
+ }
+
+ /** A {@link Coder} for {@link IsmRecord}s.
+ *
+ * <p>Note that this coder standalone will not produce an Ism file. This coder can be used
+ * to materialize a {@link PCollection} of {@link IsmRecord}s. Only when this coder
+ * is combined with an {@link IsmSink} will one produce an Ism file.
+ *
+ * <p>The {@link IsmRecord} encoded format is:
+ * <ul>
+ * <li>encoded key component 1 using key component coder 1</li>
+ * <li>...</li>
+ * <li>encoded key component N using key component coder N</li>
+ * <li>encoded value using value coder</li>
+ * </ul>
+ */
+ public static class IsmRecordCoder<V>
+ extends StandardCoder<IsmRecord<V>> {
+ /** Returns an IsmRecordCoder with the specified key component coders, value coder. */
+ public static <V> IsmRecordCoder<V> of(
+ int numberOfShardKeyCoders,
+ int numberOfMetadataShardKeyCoders,
+ List<Coder<?>> keyComponentCoders,
+ Coder<V> valueCoder) {
+ checkNotNull(keyComponentCoders);
+ checkArgument(keyComponentCoders.size() > 0);
+ checkArgument(numberOfShardKeyCoders > 0);
+ checkArgument(numberOfShardKeyCoders <= keyComponentCoders.size());
+ checkArgument(numberOfMetadataShardKeyCoders <= keyComponentCoders.size());
+ return new IsmRecordCoder<>(
+ numberOfShardKeyCoders,
+ numberOfMetadataShardKeyCoders,
+ keyComponentCoders,
+ valueCoder);
+ }
+
+ /**
+ * Returns an IsmRecordCoder with the specified coders. Note that this method is not meant
+ * to be called by users but used by Jackson when decoding this coder.
+ */
+ @JsonCreator
+ public static IsmRecordCoder<?> of(
+ @JsonProperty(PropertyNames.NUM_SHARD_CODERS) int numberOfShardCoders,
+ @JsonProperty(PropertyNames.NUM_METADATA_SHARD_CODERS) int numberOfMetadataShardCoders,
+ @JsonProperty(PropertyNames.COMPONENT_ENCODINGS) List<Coder<?>> components) {
+ Preconditions.checkArgument(components.size() >= 2,
+ "Expecting at least 2 components, got " + components.size());
+ return of(
+ numberOfShardCoders,
+ numberOfMetadataShardCoders,
+ components.subList(0, components.size() - 1),
+ components.get(components.size() - 1));
+ }
+
+ private final int numberOfShardKeyCoders;
+ private final int numberOfMetadataShardKeyCoders;
+ private final List<Coder<?>> keyComponentCoders;
+ private final Coder<V> valueCoder;
+
+ private IsmRecordCoder(
+ int numberOfShardKeyCoders,
+ int numberOfMetadataShardKeyCoders,
+ List<Coder<?>> keyComponentCoders, Coder<V> valueCoder) {
+ this.numberOfShardKeyCoders = numberOfShardKeyCoders;
+ this.numberOfMetadataShardKeyCoders = numberOfMetadataShardKeyCoders;
+ this.keyComponentCoders = keyComponentCoders;
+ this.valueCoder = valueCoder;
+ }
+
+ /** Returns the list of key component coders. */
+ public List<Coder<?>> getKeyComponentCoders() {
+ return keyComponentCoders;
+ }
+
+ /** Returns the key coder at the specified index. */
+ public Coder getKeyComponentCoder(int index) {
+ return keyComponentCoders.get(index);
+ }
+
+ /** Returns the value coder. */
+ public Coder<V> getValueCoder() {
+ return valueCoder;
+ }
+
+ @Override
+ public void encode(IsmRecord<V> value, OutputStream outStream,
+ Coder.Context context) throws CoderException, IOException {
+ if (value.getKeyComponents().size() != keyComponentCoders.size()) {
+ throw new CoderException(String.format(
+ "Expected %s key component(s) but received key component(s) %s.",
+ keyComponentCoders.size(), value.getKeyComponents()));
+ }
+ for (int i = 0; i < keyComponentCoders.size(); ++i) {
+ getKeyComponentCoder(i).encode(value.getKeyComponent(i), outStream, context.nested());
+ }
+ if (isMetadataKey(value.getKeyComponents())) {
+ ByteArrayCoder.of().encode(value.getMetadata(), outStream, context.nested());
+ } else {
+ valueCoder.encode(value.getValue(), outStream, context.nested());
+ }
+ }
+
+ @Override
+ public IsmRecord<V> decode(InputStream inStream, Coder.Context context)
+ throws CoderException, IOException {
+ List<Object> keyComponents = new ArrayList<>(keyComponentCoders.size());
+ for (Coder<?> keyCoder : keyComponentCoders) {
+ keyComponents.add(keyCoder.decode(inStream, context.nested()));
+ }
+ if (isMetadataKey(keyComponents)) {
+ return IsmRecord.<V>meta(
+ keyComponents, ByteArrayCoder.of().decode(inStream, context.nested()));
+ } else {
+ return IsmRecord.<V>of(keyComponents, valueCoder.decode(inStream, context.nested()));
+ }
+ }
+
+ public int getNumberOfShardKeyCoders(List<?> keyComponents) {
+ if (isMetadataKey(keyComponents)) {
+ return numberOfMetadataShardKeyCoders;
+ } else {
+ return numberOfShardKeyCoders;
+ }
+ }
+
+ /**
+ * Computes the shard id for the given key component(s).
+ *
+ * The shard keys are encoded into their byte representations and hashed using the
+ * <a href="http://smhasher.googlecode.com/svn/trunk/MurmurHash3.cpp">
+ * 32-bit murmur3 algorithm, x86 variant</a> (little-endian variant),
+ * using {@code 1225801234} as the seed value. We ensure that shard ids for
+ * metadata keys and normal keys do not overlap.
+ */
+ public <V, T> int hash(List<?> keyComponents) {
+ return encodeAndHash(keyComponents, new RandomAccessData(), new ArrayList<Integer>());
+ }
+
+ /**
+ * Computes the shard id for the given key component(s).
+ *
+ * Mutates {@code keyBytes} such that when returned, contains the encoded
+ * version of the key components.
+ */
+ public <V, T> int encodeAndHash(List<?> keyComponents, RandomAccessData keyBytesToMutate) {
+ return encodeAndHash(keyComponents, keyBytesToMutate, new ArrayList<Integer>());
+ }
+
+ /**
+ * Computes the shard id for the given key component(s).
+ *
+ * Mutates {@code keyBytes} such that when returned, contains the encoded
+ * version of the key components. Also, mutates {@code keyComponentByteOffsetsToMutate} to
+ * store the location where each key component's encoded byte representation ends within
+ * {@code keyBytes}.
+ */
+ public <V, T> int encodeAndHash(
+ List<?> keyComponents,
+ RandomAccessData keyBytesToMutate,
+ List<Integer> keyComponentByteOffsetsToMutate) {
+ checkNotNull(keyComponents);
+ checkArgument(keyComponents.size() <= keyComponentCoders.size(),
+ "Expected at most %s key component(s) but received %s.",
+ keyComponentCoders.size(), keyComponents);
+
+ final int numberOfKeyCodersToUse;
+ final int shardOffset;
+ if (isMetadataKey(keyComponents)) {
+ numberOfKeyCodersToUse = numberOfMetadataShardKeyCoders;
+ shardOffset = SHARD_BITS + 1;
+ } else {
+ numberOfKeyCodersToUse = numberOfShardKeyCoders;
+ shardOffset = 0;
+ }
+
+ checkArgument(numberOfKeyCodersToUse <= keyComponents.size(),
+ "Expected at least %s key component(s) but received %s.",
+ numberOfShardKeyCoders, keyComponents);
+
+ try {
+ // Encode the shard portion
+ for (int i = 0; i < numberOfKeyCodersToUse; ++i) {
+ getKeyComponentCoder(i).encode(
+ keyComponents.get(i), keyBytesToMutate.asOutputStream(), Context.NESTED);
+ keyComponentByteOffsetsToMutate.add(keyBytesToMutate.size());
+ }
+ int rval = HASH_FUNCTION.hashBytes(
+ keyBytesToMutate.array(), 0, keyBytesToMutate.size()).asInt() & SHARD_BITS;
+ rval += shardOffset;
+
+ // Encode the remainder
+ for (int i = numberOfKeyCodersToUse; i < keyComponents.size(); ++i) {
+ getKeyComponentCoder(i).encode(
+ keyComponents.get(i), keyBytesToMutate.asOutputStream(), Context.NESTED);
+ keyComponentByteOffsetsToMutate.add(keyBytesToMutate.size());
+ }
+ return rval;
+ } catch (IOException e) {
+ throw new IllegalStateException(
+ String.format("Failed to hash %s with coder %s", keyComponents, this), e);
+ }
+ }
+
+ @Override
+ public List<Coder<?>> getCoderArguments() {
+ return ImmutableList.<Coder<?>>builder()
+ .addAll(keyComponentCoders)
+ .add(valueCoder)
+ .build();
+ }
+
+ @Override
+ public CloudObject asCloudObject() {
+ CloudObject cloudObject = super.asCloudObject();
+ addLong(cloudObject, PropertyNames.NUM_SHARD_CODERS, numberOfShardKeyCoders);
+ addLong(cloudObject, PropertyNames.NUM_METADATA_SHARD_CODERS, numberOfMetadataShardKeyCoders);
+ return cloudObject;
+ }
+
+ @Override
+ public void verifyDeterministic() throws Coder.NonDeterministicException {
+ verifyDeterministic("Key component coders expected to be deterministic.", keyComponentCoders);
+ verifyDeterministic("Value coder expected to be deterministic.", valueCoder);
+ }
+
+ @Override
+ public boolean consistentWithEquals() {
+ for (Coder<?> keyComponentCoder : keyComponentCoders) {
+ if (!keyComponentCoder.consistentWithEquals()) {
+ return false;
+ }
+ }
+ return valueCoder.consistentWithEquals();
+ }
+
+ @Override
+ public Object structuralValue(IsmRecord<V> record) throws Exception {
+ checkState(record.getKeyComponents().size() == keyComponentCoders.size(),
+ "Expected the number of key component coders %s "
+ + "to match the number of key components %s.",
+ keyComponentCoders.size(), record.getKeyComponents());
+
+ if (record != null && consistentWithEquals()) {
+ ArrayList<Object> keyComponentStructuralValues = new ArrayList<>();
+ for (int i = 0; i < keyComponentCoders.size(); ++i) {
+ keyComponentStructuralValues.add(
+ getKeyComponentCoder(i).structuralValue(record.getKeyComponent(i)));
+ }
+ if (isMetadataKey(record.getKeyComponents())) {
+ return IsmRecord.meta(keyComponentStructuralValues, record.getMetadata());
+ } else {
+ return IsmRecord.of(keyComponentStructuralValues,
+ valueCoder.structuralValue(record.getValue()));
+ }
+ }
+ return super.structuralValue(record);
+ }
+ }
+
+ /**
+ * Validates that the key portion of the given coder is deterministic.
+ */
+ public static void validateCoderIsCompatible(IsmRecordCoder<?> coder) {
+ for (Coder<?> keyComponentCoder : coder.getKeyComponentCoders()) {
+ try {
+ keyComponentCoder.verifyDeterministic();
+ } catch (NonDeterministicException e) {
+ throw new IllegalArgumentException(
+ String.format("Key component coder %s is expected to be deterministic.",
+ keyComponentCoder), e);
+ }
+ }
+ }
+
+ /** Returns true if and only if any of the passed in key components represent a metadata key. */
+ public static boolean isMetadataKey(List<?> keyComponents) {
+ for (Object keyComponent : keyComponents) {
+ if (keyComponent == METADATA_KEY) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /** A marker object representing the wildcard metadata key component. */
+ private static final Object METADATA_KEY = new Object() {
+ @Override
+ public String toString() {
+ return "META";
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ return this == obj;
+ }
+
+ @Override
+ public int hashCode() {
+ return -1248902349;
+ }
+ };
+
+ /**
+ * An object representing a wild card for a key component.
+ * Encoded using {@link MetadataKeyCoder}.
+ */
+ public static Object getMetadataKey() {
+ return METADATA_KEY;
+ }
+
+ /**
+ * A coder for metadata key component. Can be used to wrap key component coder allowing for
+ * the metadata key component to be used as a place holder instead of an actual key.
+ */
+ public static class MetadataKeyCoder<K> extends StandardCoder<K> {
+ public static <K> MetadataKeyCoder<K> of(Coder<K> keyCoder) {
+ checkNotNull(keyCoder);
+ return new MetadataKeyCoder<>(keyCoder);
+ }
+
+ /**
+ * Returns an IsmRecordCoder with the specified coders. Note that this method is not meant
+ * to be called by users but used by Jackson when decoding this coder.
+ */
+ @JsonCreator
+ public static MetadataKeyCoder<?> of(
+ @JsonProperty(PropertyNames.COMPONENT_ENCODINGS) List<Coder<?>> components) {
+ Preconditions.checkArgument(components.size() == 1,
+ "Expecting one component, got " + components.size());
+ return of(components.get(0));
+ }
+
+ private final Coder<K> keyCoder;
+
+ private MetadataKeyCoder(Coder<K> keyCoder) {
+ this.keyCoder = keyCoder;
+ }
+
+ public Coder<K> getKeyCoder() {
+ return keyCoder;
+ }
+
+ @Override
+ public void encode(K value, OutputStream outStream, Coder.Context context)
+ throws CoderException, IOException {
+ if (value == METADATA_KEY) {
+ outStream.write(0);
+ } else {
+ outStream.write(1);
+ keyCoder.encode(value, outStream, context.nested());
+ }
+ }
+
+ @Override
+ public K decode(InputStream inStream, Coder.Context context)
+ throws CoderException, IOException {
+ int marker = inStream.read();
+ if (marker == 0) {
+ return (K) getMetadataKey();
+ } else if (marker == 1) {
+ return keyCoder.decode(inStream, context.nested());
+ } else {
+ throw new CoderException(String.format("Expected marker but got %s.", marker));
+ }
+ }
+
+ @Override
+ public List<Coder<?>> getCoderArguments() {
+ return ImmutableList.<Coder<?>>of(keyCoder);
+ }
+
+ @Override
+ public void verifyDeterministic() throws NonDeterministicException {
+ verifyDeterministic("Expected key coder to be deterministic", keyCoder);
+ }
+ }
+
+ /**
+ * A shard descriptor containing shard id, the data block offset, and the index offset for the
+ * given shard.
+ */
+ @AutoValue
+ public abstract static class IsmShard {
+ abstract int id();
+ abstract long blockOffset();
+ abstract long indexOffset();
+
+ IsmShard() {}
+
+ /** Returns an IsmShard with the given id, block offset and no index offset. */
+ public static IsmShard of(int id, long blockOffset) {
+ IsmShard ismShard = new AutoValue_IsmFormat_IsmShard(id, blockOffset, -1);
+ checkState(id >= 0,
+ "%s attempting to be written with negative shard id.",
+ ismShard);
+ checkState(blockOffset >= 0,
+ "%s attempting to be written with negative block offset.",
+ ismShard);
+ return ismShard;
+ }
+
+ /** Returns an IsmShard with the given id, block offset, and index offset. */
+ public static IsmShard of(int id, long blockOffset, long indexOffset) {
+ IsmShard ismShard = new AutoValue_IsmFormat_IsmShard(id, blockOffset, indexOffset);
+ checkState(id >= 0,
+ "%s attempting to be written with negative shard id.",
+ ismShard);
+ checkState(blockOffset >= 0,
+ "%s attempting to be written with negative block offset.",
+ ismShard);
+ checkState(indexOffset >= 0,
+ "%s attempting to be written with negative index offset.",
+ ismShard);
+ return ismShard;
+ }
+
+ /** Return the shard id. */
+ public int getId() {
+ return id();
+ }
+
+ /** Return the absolute position within the Ism file where the data block begins. */
+ public long getBlockOffset() {
+ return blockOffset();
+ }
+
+ /**
+ * Return the absolute position within the Ism file where the index block begins.
+ * Throws {@link IllegalStateException} if the index offset was never specified.
+ */
+ public long getIndexOffset() {
+ checkState(indexOffset() >= 0,
+ "Unable to fetch index offset because it was never specified.");
+ return indexOffset();
+ }
+
+ /** Returns a new IsmShard like this one with the specified index offset. */
+ public IsmShard withIndexOffset(long indexOffset) {
+ return of(id(), blockOffset(), indexOffset);
+ }
+ }
+
+ /**
+ * A {@link ListCoder} wrapping a {@link IsmShardCoder} used to encode the shard index.
+ * See {@link ListCoder} for its encoding specification and {@link IsmShardCoder} for its
+ * encoding specification.
+ */
+ public static final Coder<List<IsmShard>> ISM_SHARD_INDEX_CODER =
+ ListCoder.of(IsmShardCoder.of());
+
+ /**
+ * A coder for {@link IsmShard}s.
+ *
+ * The shard descriptor is encoded as:
+ * <ul>
+ * <li>id (variable length integer encoding)</li>
+ * <li>blockOffset (variable length long encoding)</li>
+ * <li>indexOffset (variable length long encoding)</li>
+ * </ul>
+ */
+ public static class IsmShardCoder extends AtomicCoder<IsmShard> {
+ private static final IsmShardCoder INSTANCE = new IsmShardCoder();
+
+ /** Returns an IsmShardCoder. */
+ @JsonCreator
+ public static IsmShardCoder of() {
+ return INSTANCE;
+ }
+
+ private IsmShardCoder() {
+ }
+
+ @Override
+ public void encode(IsmShard value, OutputStream outStream, Coder.Context context)
+ throws CoderException, IOException {
+ checkState(value.getIndexOffset() >= 0,
+ "%s attempting to be written without index offset.",
+ value);
+ VarIntCoder.of().encode(value.getId(), outStream, context.nested());
+ VarLongCoder.of().encode(value.getBlockOffset(), outStream, context.nested());
+ VarLongCoder.of().encode(value.getIndexOffset(), outStream, context.nested());
+ }
+
+ @Override
+ public IsmShard decode(
+ InputStream inStream, Coder.Context context) throws CoderException, IOException {
+ return IsmShard.of(
+ VarIntCoder.of().decode(inStream, context),
+ VarLongCoder.of().decode(inStream, context),
+ VarLongCoder.of().decode(inStream, context));
+ }
+
+ @Override
+ public boolean consistentWithEquals() {
+ return true;
+ }
+ }
+
+ /**
+ * The prefix used before each key which contains the number of shared and unshared
+ * bytes from the previous key that was read. The key prefix along with the previous key
+ * and the unshared key bytes allows one to construct the current key by doing the following
+ * {@code currentKey = previousKey[0 : sharedBytes] + read(unsharedBytes)}.
+ *
+ * <p>The key prefix is encoded as:
+ * <ul>
+ * <li>number of shared key bytes (variable length integer coding)</li>
+ * <li>number of unshared key bytes (variable length integer coding)</li>
+ * </ul>
+ */
+ @AutoValue
+ public abstract static class KeyPrefix {
+ public abstract int getSharedKeySize();
+ public abstract int getUnsharedKeySize();
+
+ public static KeyPrefix of(int sharedKeySize, int unsharedKeySize) {
+ return new AutoValue_IsmFormat_KeyPrefix(sharedKeySize, unsharedKeySize);
+ }
+ }
+
+ /** A {@link Coder} for {@link KeyPrefix}. */
+ public static final class KeyPrefixCoder extends AtomicCoder<KeyPrefix> {
+ private static final KeyPrefixCoder INSTANCE = new KeyPrefixCoder();
+
+ @JsonCreator
+ public static KeyPrefixCoder of() {
+ return INSTANCE;
+ }
+
+ @Override
+ public void encode(KeyPrefix value, OutputStream outStream, Coder.Context context)
+ throws CoderException, IOException {
+ VarInt.encode(value.getSharedKeySize(), outStream);
+ VarInt.encode(value.getUnsharedKeySize(), outStream);
+ }
+
+ @Override
+ public KeyPrefix decode(InputStream inStream, Coder.Context context)
+ throws CoderException, IOException {
+ return KeyPrefix.of(VarInt.decodeInt(inStream), VarInt.decodeInt(inStream));
+ }
+
+ @Override
+ public boolean consistentWithEquals() {
+ return true;
+ }
+
+ @Override
+ public boolean isRegisterByteSizeObserverCheap(KeyPrefix value, Coder.Context context) {
+ return true;
+ }
+
+ @Override
+ public long getEncodedElementByteSize(KeyPrefix value, Coder.Context context)
+ throws Exception {
+ Preconditions.checkNotNull(value);
+ return VarInt.getLength(value.getSharedKeySize())
+ + VarInt.getLength(value.getUnsharedKeySize());
+ }
+ }
+
+ /**
+ * The footer stores the relevant information required to locate the index and bloom filter.
+ * It also stores a version byte and the number of keys stored.
+ *
+ * <p>The footer is encoded as the value containing:
+ * <ul>
+ * <li>start of bloom filter offset (big endian long coding)</li>
+ * <li>start of shard index position offset (big endian long coding)</li>
+ * <li>number of keys in file (big endian long coding)</li>
+ * <li>0x01 (version key as a single byte)</li>
+ * </ul>
+ */
+ @AutoValue
+ public abstract static class Footer {
+ public static final int LONG_BYTES = 8;
+ public static final int FIXED_LENGTH = 3 * LONG_BYTES + 1;
+ public static final byte VERSION = 2;
+
+ public abstract byte getVersion();
+ public abstract long getIndexPosition();
+ public abstract long getBloomFilterPosition();
+ public abstract long getNumberOfKeys();
+
+ public static Footer of(long indexPosition, long bloomFilterPosition, long numberOfKeys) {
+ return new AutoValue_IsmFormat_Footer(
+ VERSION, indexPosition, bloomFilterPosition, numberOfKeys);
+ }
+ }
+
+ /** A {@link Coder} for {@link Footer}. */
+ public static final class FooterCoder extends AtomicCoder<Footer> {
+ private static final FooterCoder INSTANCE = new FooterCoder();
+
+ @JsonCreator
+ public static FooterCoder of() {
+ return INSTANCE;
+ }
+
+ @Override
+ public void encode(Footer value, OutputStream outStream, Coder.Context context)
+ throws CoderException, IOException {
+ DataOutputStream dataOut = new DataOutputStream(outStream);
+ dataOut.writeLong(value.getIndexPosition());
+ dataOut.writeLong(value.getBloomFilterPosition());
+ dataOut.writeLong(value.getNumberOfKeys());
+ dataOut.write(Footer.VERSION);
+ }
+
+ @Override
+ public Footer decode(InputStream inStream, Coder.Context context)
+ throws CoderException, IOException {
+ DataInputStream dataIn = new DataInputStream(inStream);
+ Footer footer = Footer.of(dataIn.readLong(), dataIn.readLong(), dataIn.readLong());
+ int version = dataIn.read();
+ if (version != Footer.VERSION) {
+ throw new IOException("Unknown version " + version + ". "
+ + "Only version 2 is currently supported.");
+ }
+ return footer;
+ }
+
+ @Override
+ public boolean consistentWithEquals() {
+ return true;
+ }
+
+ @Override
+ public boolean isRegisterByteSizeObserverCheap(Footer value, Coder.Context context) {
+ return true;
+ }
+
+ @Override
+ public long getEncodedElementByteSize(Footer value, Coder.Context context)
+ throws Exception {
+ return Footer.FIXED_LENGTH;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6023d26a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java
new file mode 100644
index 0000000..f83acbc
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.util;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.PCollectionView;
+
+import java.io.Serializable;
+
+/**
+ * Wrapper class holding the necessary information to serialize a DoFn.
+ *
+ * @param <InputT> the type of the (main) input elements of the DoFn
+ * @param <OutputT> the type of the (main) output elements of the DoFn
+ */
+public class DoFnInfo<InputT, OutputT> implements Serializable {
+ private final DoFn<InputT, OutputT> doFn;
+ private final WindowingStrategy<?, ?> windowingStrategy;
+ private final Iterable<PCollectionView<?>> sideInputViews;
+ private final Coder<InputT> inputCoder;
+
+ public DoFnInfo(DoFn<InputT, OutputT> doFn, WindowingStrategy<?, ?> windowingStrategy) {
+ this.doFn = doFn;
+ this.windowingStrategy = windowingStrategy;
+ this.sideInputViews = null;
+ this.inputCoder = null;
+ }
+
+ public DoFnInfo(DoFn<InputT, OutputT> doFn, WindowingStrategy<?, ?> windowingStrategy,
+ Iterable<PCollectionView<?>> sideInputViews, Coder<InputT> inputCoder) {
+ this.doFn = doFn;
+ this.windowingStrategy = windowingStrategy;
+ this.sideInputViews = sideInputViews;
+ this.inputCoder = inputCoder;
+ }
+
+ public DoFn<InputT, OutputT> getDoFn() {
+ return doFn;
+ }
+
+ public WindowingStrategy<?, ?> getWindowingStrategy() {
+ return windowingStrategy;
+ }
+
+ public Iterable<PCollectionView<?>> getSideInputViews() {
+ return sideInputViews;
+ }
+
+ public Coder<InputT> getInputCoder() {
+ return inputCoder;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6023d26a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/MonitoringUtil.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/MonitoringUtil.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/MonitoringUtil.java
index 2eec9cc..67cdfa6 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/MonitoringUtil.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/MonitoringUtil.java
@@ -17,11 +17,10 @@
*/
package org.apache.beam.runners.dataflow.util;
-import static org.apache.beam.sdk.util.TimeUtil.fromCloudTime;
+import static org.apache.beam.runners.dataflow.util.TimeUtil.fromCloudTime;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.PipelineResult.State;
-import org.apache.beam.sdk.util.TimeUtil;
import com.google.api.services.dataflow.Dataflow;
import com.google.api.services.dataflow.Dataflow.Projects.Jobs.Messages;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6023d26a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/OutputReference.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/OutputReference.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/OutputReference.java
new file mode 100644
index 0000000..1b525ac
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/OutputReference.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.util;
+
+import static com.google.api.client.util.Preconditions.checkNotNull;
+
+import com.google.api.client.json.GenericJson;
+import com.google.api.client.util.Key;
+
+/**
+ * A representation used by {@link com.google.api.services.dataflow.model.Step}s
+ * to reference the output of other {@code Step}s.
+ */
+public final class OutputReference extends GenericJson {
+ @Key("@type")
+ public final String type = "OutputReference";
+
+ @Key("step_name")
+ private final String stepName;
+
+ @Key("output_name")
+ private final String outputName;
+
+ public OutputReference(String stepName, String outputName) {
+ this.stepName = checkNotNull(stepName);
+ this.outputName = checkNotNull(outputName);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6023d26a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/RandomAccessData.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/RandomAccessData.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/RandomAccessData.java
new file mode 100644
index 0000000..9e10242
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/RandomAccessData.java
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.util;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.util.VarInt;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.io.ByteStreams;
+import com.google.common.primitives.UnsignedBytes;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.Comparator;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+/**
+ * An elastic-sized byte array which allows you to manipulate it as a stream, or access
+ * it directly. This allows for a quick succession of moving bytes from an {@link InputStream}
+ * to this wrapper to be used as an {@link OutputStream} and vice versa. This wrapper
+ * also provides random access to bytes stored within. This wrapper allows users to finely
+ * control the number of byte copies that occur.
+ *
+ * Anything stored within the in-memory buffer from offset {@link #size()} is considered temporary
+ * unused storage.
+ */
+@NotThreadSafe
+public class RandomAccessData {
+ /**
+ * A {@link Coder} which encodes the valid parts of this stream.
+ * This follows the same encoding scheme as {@link ByteArrayCoder}.
+ * This coder is deterministic and consistent with equals.
+ *
+ * This coder does not support encoding positive infinity.
+ */
+ public static class RandomAccessDataCoder extends AtomicCoder<RandomAccessData> {
+ private static final RandomAccessDataCoder INSTANCE = new RandomAccessDataCoder();
+
+ @JsonCreator
+ public static RandomAccessDataCoder of() {
+ return INSTANCE;
+ }
+
+ @Override
+ public void encode(RandomAccessData value, OutputStream outStream, Coder.Context context)
+ throws CoderException, IOException {
+ if (value == POSITIVE_INFINITY) {
+ throw new CoderException("Positive infinity can not be encoded.");
+ }
+ if (!context.isWholeStream) {
+ VarInt.encode(value.size, outStream);
+ }
+ value.writeTo(outStream, 0, value.size);
+ }
+
+ @Override
+ public RandomAccessData decode(InputStream inStream, Coder.Context context)
+ throws CoderException, IOException {
+ RandomAccessData rval = new RandomAccessData();
+ if (!context.isWholeStream) {
+ int length = VarInt.decodeInt(inStream);
+ rval.readFrom(inStream, 0, length);
+ } else {
+ ByteStreams.copy(inStream, rval.asOutputStream());
+ }
+ return rval;
+ }
+
+ @Override
+ public boolean consistentWithEquals() {
+ return true;
+ }
+
+ @Override
+ public boolean isRegisterByteSizeObserverCheap(
+ RandomAccessData value, Coder.Context context) {
+ return true;
+ }
+
+ @Override
+ protected long getEncodedElementByteSize(RandomAccessData value, Coder.Context context)
+ throws Exception {
+ if (value == null) {
+ throw new CoderException("cannot encode a null in memory stream");
+ }
+ long size = 0;
+ if (!context.isWholeStream) {
+ size += VarInt.getLength(value.size);
+ }
+ return size + value.size;
+ }
+ }
+
+ public static final UnsignedLexicographicalComparator UNSIGNED_LEXICOGRAPHICAL_COMPARATOR =
+ new UnsignedLexicographicalComparator();
+
+ /**
+ * A {@link Comparator} that compares two byte arrays lexicographically. It compares
+ * values as a list of unsigned bytes. The first pair of values that follow any common prefix,
+ * or when one array is a prefix of the other, treats the shorter array as the lesser.
+ * For example, [] < [0x01] < [0x01, 0x7F] < [0x01, 0x80] < [0x02] < POSITIVE INFINITY.
+ *
+ * <p>Note that a token type of positive infinity is supported and is greater than
+ * all other {@link RandomAccessData}.
+ */
+ public static final class UnsignedLexicographicalComparator
+ implements Comparator<RandomAccessData> {
+ // Do not instantiate
+ private UnsignedLexicographicalComparator() {
+ }
+
+ @Override
+ public int compare(RandomAccessData o1, RandomAccessData o2) {
+ return compare(o1, o2, 0 /* start from the beginning */);
+ }
+
+ /**
+ * Compare the two sets of bytes starting at the given offset.
+ */
+ public int compare(RandomAccessData o1, RandomAccessData o2, int startOffset) {
+ if (o1 == o2) {
+ return 0;
+ }
+ if (o1 == POSITIVE_INFINITY) {
+ return 1;
+ }
+ if (o2 == POSITIVE_INFINITY) {
+ return -1;
+ }
+
+ int minBytesLen = Math.min(o1.size, o2.size);
+ for (int i = startOffset; i < minBytesLen; i++) {
+ // unsigned comparison
+ int b1 = o1.buffer[i] & 0xFF;
+ int b2 = o2.buffer[i] & 0xFF;
+ if (b1 == b2) {
+ continue;
+ }
+ // Return the stream with the smaller byte as the smaller value.
+ return b1 - b2;
+ }
+ // If one is a prefix of the other, return the shorter one as the smaller one.
+ // If both lengths are equal, then both streams are equal.
+ return o1.size - o2.size;
+ }
+
+ /**
+ * Compute the length of the common prefix of the two provided sets of bytes.
+ */
+ public int commonPrefixLength(RandomAccessData o1, RandomAccessData o2) {
+ int minBytesLen = Math.min(o1.size, o2.size);
+ for (int i = 0; i < minBytesLen; i++) {
+ // unsigned comparison
+ int b1 = o1.buffer[i] & 0xFF;
+ int b2 = o2.buffer[i] & 0xFF;
+ if (b1 != b2) {
+ return i;
+ }
+ }
+ return minBytesLen;
+ }
+ }
+
+ /** A token type representing positive infinity. */
+ static final RandomAccessData POSITIVE_INFINITY = new RandomAccessData(0);
+
+ /**
+ * Returns a RandomAccessData that is the smallest value of same length which
+ * is strictly greater than this. Note that if this is empty or is all 0xFF then
+ * a token value of positive infinity is returned.
+ *
+ * The {@link UnsignedLexicographicalComparator} supports comparing {@link RandomAccessData}
+ * with support for positive infinitiy.
+ */
+ public RandomAccessData increment() throws IOException {
+ RandomAccessData copy = copy();
+ for (int i = copy.size - 1; i >= 0; --i) {
+ if (copy.buffer[i] != UnsignedBytes.MAX_VALUE) {
+ copy.buffer[i] = UnsignedBytes.checkedCast(UnsignedBytes.toInt(copy.buffer[i]) + 1);
+ return copy;
+ }
+ }
+ return POSITIVE_INFINITY;
+ }
+
+ private static final int DEFAULT_INITIAL_BUFFER_SIZE = 128;
+
+ /** Constructs a RandomAccessData with a default buffer size. */
+ public RandomAccessData() {
+ this(DEFAULT_INITIAL_BUFFER_SIZE);
+ }
+
+ /** Constructs a RandomAccessData with the initial buffer. */
+ public RandomAccessData(byte[] initialBuffer) {
+ checkNotNull(initialBuffer);
+ this.buffer = initialBuffer;
+ this.size = initialBuffer.length;
+ }
+
+ /** Constructs a RandomAccessData with the given buffer size. */
+ public RandomAccessData(int initialBufferSize) {
+ checkArgument(initialBufferSize >= 0, "Expected initial buffer size to be greater than zero.");
+ this.buffer = new byte[initialBufferSize];
+ }
+
+ private byte[] buffer;
+ private int size;
+
+ /** Returns the backing array. */
+ public byte[] array() {
+ return buffer;
+ }
+
+ /** Returns the number of bytes in the backing array that are valid. */
+ public int size() {
+ return size;
+ }
+
+ /** Resets the end of the stream to the specified position. */
+ public void resetTo(int position) {
+ ensureCapacity(position);
+ size = position;
+ }
+
+ private final OutputStream outputStream = new OutputStream() {
+ @Override
+ public void write(int b) throws IOException {
+ ensureCapacity(size + 1);
+ buffer[size] = (byte) b;
+ size += 1;
+ }
+
+ @Override
+ public void write(byte[] b, int offset, int length) throws IOException {
+ ensureCapacity(size + length);
+ System.arraycopy(b, offset, buffer, size, length);
+ size += length;
+ }
+ };
+
+ /**
+ * Returns an output stream which writes to the backing buffer from the current position.
+ * Note that the internal buffer will grow as required to accomodate all data written.
+ */
+ public OutputStream asOutputStream() {
+ return outputStream;
+ }
+
+ /**
+ * Returns an {@link InputStream} wrapper which supplies the portion of this backing byte buffer
+ * starting at {@code offset} and up to {@code length} bytes. Note that the returned
+ * {@link InputStream} is only a wrapper and any modifications to the underlying
+ * {@link RandomAccessData} will be visible by the {@link InputStream}.
+ */
+ public InputStream asInputStream(final int offset, final int length) {
+ return new ByteArrayInputStream(buffer, offset, length);
+ }
+
+ /**
+ * Writes {@code length} bytes starting at {@code offset} from the backing data store to the
+ * specified output stream.
+ */
+ public void writeTo(OutputStream out, int offset, int length) throws IOException {
+ out.write(buffer, offset, length);
+ }
+
+ /**
+ * Reads {@code length} bytes from the specified input stream writing them into the backing
+ * data store starting at {@code offset}.
+ *
+ * <p>Note that the in memory stream will be grown to ensure there is enough capacity.
+ */
+ public void readFrom(InputStream inStream, int offset, int length) throws IOException {
+ ensureCapacity(offset + length);
+ ByteStreams.readFully(inStream, buffer, offset, length);
+ size = offset + length;
+ }
+
+ /** Returns a copy of this RandomAccessData. */
+ public RandomAccessData copy() throws IOException {
+ RandomAccessData copy = new RandomAccessData(size);
+ writeTo(copy.asOutputStream(), 0, size);
+ return copy;
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (other == this) {
+ return true;
+ }
+ if (!(other instanceof RandomAccessData)) {
+ return false;
+ }
+ return UNSIGNED_LEXICOGRAPHICAL_COMPARATOR.compare(this, (RandomAccessData) other) == 0;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = 1;
+ for (int i = 0; i < size; ++i) {
+ result = 31 * result + buffer[i];
+ }
+
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("buffer", Arrays.copyOf(buffer, size))
+ .add("size", size)
+ .toString();
+ }
+
+ private void ensureCapacity(int minCapacity) {
+ // If we have enough space, don't grow the buffer.
+ if (minCapacity <= buffer.length) {
+ return;
+ }
+
+ // Try to double the size of the buffer, if thats not enough, just use the new capacity.
+ // Note that we use Math.min(long, long) to not cause overflow on the multiplication.
+ int newCapacity = (int) Math.min(Integer.MAX_VALUE, buffer.length * 2L);
+ if (newCapacity < minCapacity) {
+ newCapacity = minCapacity;
+ }
+ buffer = Arrays.copyOf(buffer, newCapacity);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6023d26a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/TimeUtil.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/TimeUtil.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/TimeUtil.java
new file mode 100644
index 0000000..434c83f
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/TimeUtil.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.util;
+
+import org.joda.time.DateTime;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.joda.time.ReadableDuration;
+import org.joda.time.ReadableInstant;
+import org.joda.time.chrono.ISOChronology;
+
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import javax.annotation.Nullable;
+
+/**
+ * A helper class for converting between Dataflow API and SDK time
+ * representations.
+ *
+ * <p>Dataflow API times are strings of the form
+ * {@code YYYY-MM-dd'T'HH:mm:ss[.nnnn]'Z'}: that is, RFC 3339
+ * strings with optional fractional seconds and a 'Z' offset.
+ *
+ * <p>Dataflow API durations are strings of the form {@code ['-']sssss[.nnnn]'s'}:
+ * that is, seconds with optional fractional seconds and a literal 's' at the end.
+ *
+ * <p>In both formats, fractional seconds are either three digits (millisecond
+ * resolution), six digits (microsecond resolution), or nine digits (nanosecond
+ * resolution).
+ */
+public final class TimeUtil {
+ private TimeUtil() {} // Non-instantiable.
+
+ private static final Pattern DURATION_PATTERN = Pattern.compile("(\\d+)(?:\\.(\\d+))?s");
+ private static final Pattern TIME_PATTERN =
+ Pattern.compile("(\\d{4})-(\\d{2})-(\\d{2})T(\\d{2}):(\\d{2}):(\\d{2})(?:\\.(\\d+))?Z");
+
+ /**
+ * Converts a {@link ReadableInstant} into a Dateflow API time value.
+ */
+ public static String toCloudTime(ReadableInstant instant) {
+ // Note that since Joda objects use millisecond resolution, we always
+ // produce either no fractional seconds or fractional seconds with
+ // millisecond resolution.
+
+ // Translate the ReadableInstant to a DateTime with ISOChronology.
+ DateTime time = new DateTime(instant);
+
+ int millis = time.getMillisOfSecond();
+ if (millis == 0) {
+ return String.format("%04d-%02d-%02dT%02d:%02d:%02dZ",
+ time.getYear(),
+ time.getMonthOfYear(),
+ time.getDayOfMonth(),
+ time.getHourOfDay(),
+ time.getMinuteOfHour(),
+ time.getSecondOfMinute());
+ } else {
+ return String.format("%04d-%02d-%02dT%02d:%02d:%02d.%03dZ",
+ time.getYear(),
+ time.getMonthOfYear(),
+ time.getDayOfMonth(),
+ time.getHourOfDay(),
+ time.getMinuteOfHour(),
+ time.getSecondOfMinute(),
+ millis);
+ }
+ }
+
+ /**
+ * Converts a time value received via the Dataflow API into the corresponding
+ * {@link Instant}.
+ * @return the parsed time, or null if a parse error occurs
+ */
+ @Nullable
+ public static Instant fromCloudTime(String time) {
+ Matcher matcher = TIME_PATTERN.matcher(time);
+ if (!matcher.matches()) {
+ return null;
+ }
+ int year = Integer.valueOf(matcher.group(1));
+ int month = Integer.valueOf(matcher.group(2));
+ int day = Integer.valueOf(matcher.group(3));
+ int hour = Integer.valueOf(matcher.group(4));
+ int minute = Integer.valueOf(matcher.group(5));
+ int second = Integer.valueOf(matcher.group(6));
+ int millis = 0;
+
+ String frac = matcher.group(7);
+ if (frac != null) {
+ int fracs = Integer.valueOf(frac);
+ if (frac.length() == 3) { // millisecond resolution
+ millis = fracs;
+ } else if (frac.length() == 6) { // microsecond resolution
+ millis = fracs / 1000;
+ } else if (frac.length() == 9) { // nanosecond resolution
+ millis = fracs / 1000000;
+ } else {
+ return null;
+ }
+ }
+
+ return new DateTime(year, month, day, hour, minute, second, millis,
+ ISOChronology.getInstanceUTC()).toInstant();
+ }
+
+ /**
+ * Converts a {@link ReadableDuration} into a Dataflow API duration string.
+ */
+ public static String toCloudDuration(ReadableDuration duration) {
+ // Note that since Joda objects use millisecond resolution, we always
+ // produce either no fractional seconds or fractional seconds with
+ // millisecond resolution.
+ long millis = duration.getMillis();
+ long seconds = millis / 1000;
+ millis = millis % 1000;
+ if (millis == 0) {
+ return String.format("%ds", seconds);
+ } else {
+ return String.format("%d.%03ds", seconds, millis);
+ }
+ }
+
+ /**
+ * Converts a Dataflow API duration string into a {@link Duration}.
+ * @return the parsed duration, or null if a parse error occurs
+ */
+ @Nullable
+ public static Duration fromCloudDuration(String duration) {
+ Matcher matcher = DURATION_PATTERN.matcher(duration);
+ if (!matcher.matches()) {
+ return null;
+ }
+ long millis = Long.valueOf(matcher.group(1)) * 1000;
+ String frac = matcher.group(2);
+ if (frac != null) {
+ long fracs = Long.valueOf(frac);
+ if (frac.length() == 3) { // millisecond resolution
+ millis += fracs;
+ } else if (frac.length() == 6) { // microsecond resolution
+ millis += fracs / 1000;
+ } else if (frac.length() == 9) { // nanosecond resolution
+ millis += fracs / 1000000;
+ } else {
+ return null;
+ }
+ }
+ return Duration.millis(millis);
+ }
+}