You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2017/05/03 19:47:36 UTC

[1/3] beam git commit: [BEAM-1871] Move GCP specific serialization CloudObject and supporting translation code to Dataflow runner module

Repository: beam
Updated Branches:
  refs/heads/master 320f9affb -> aafa1bba9


http://git-wip-us.apache.org/repos/asf/beam/blob/a5627b1a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CloudObject.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CloudObject.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CloudObject.java
deleted file mode 100644
index 061e56a..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CloudObject.java
+++ /dev/null
@@ -1,187 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import com.google.api.client.json.GenericJson;
-import com.google.api.client.util.Key;
-import java.util.Map;
-import javax.annotation.Nullable;
-
-/**
- * A representation of an arbitrary Java object to be instantiated by Dataflow
- * workers.
- *
- * <p>Typically, an object to be written by the SDK to the Dataflow service will
- * implement a method (typically called {@code asCloudObject()}) that returns a
- * {@code CloudObject} to represent the object in the protocol.  Once the
- * {@code CloudObject} is constructed, the method should explicitly add
- * additional properties to be presented during deserialization, representing
- * child objects by building additional {@code CloudObject}s.
- *
- * @deprecated replaced by {@code org.apache.beam.runners.dataflow.CloudKnownType}
- */
-@Deprecated
-public final class CloudObject extends GenericJson {
-  /**
-   * Constructs a {@code CloudObject} by copying the supplied serialized object
-   * spec, which must represent an SDK object serialized for transport via the
-   * Dataflow API.
-   *
-   * <p>The most common use of this method is during deserialization on the worker,
-   * where it's used as a binding type during instance construction.
-   *
-   * @param spec supplies the serialized form of the object as a nested map
-   * @throws RuntimeException if the supplied map does not represent an SDK object
-   */
-  public static CloudObject fromSpec(Map<String, Object> spec) {
-    CloudObject result = new CloudObject();
-    result.putAll(spec);
-    if (result.className == null) {
-      throw new RuntimeException("Unable to create an SDK object from " + spec
-          + ": Object class not specified (missing \""
-          + PropertyNames.OBJECT_TYPE_NAME + "\" field)");
-    }
-    return result;
-  }
-
-  /**
-   * Constructs a {@code CloudObject} to be used for serializing an instance of
-   * the supplied class for transport via the Dataflow API.  The instance
-   * parameters to be serialized must be supplied explicitly after the
-   * {@code CloudObject} is created, by using {@link CloudObject#put}.
-   *
-   * @param cls the class to use when deserializing the object on the worker
-   */
-  public static CloudObject forClass(Class<?> cls) {
-    CloudObject result = new CloudObject();
-    result.className = checkNotNull(cls).getName();
-    return result;
-  }
-
-  /**
-   * Constructs a {@code CloudObject} to be used for serializing data to be
-   * deserialized using the supplied class name the supplied class name for
-   * transport via the Dataflow API.  The instance parameters to be serialized
-   * must be supplied explicitly after the {@code CloudObject} is created, by
-   * using {@link CloudObject#put}.
-   *
-   * @param className the class to use when deserializing the object on the worker
-   */
-  public static CloudObject forClassName(String className) {
-    CloudObject result = new CloudObject();
-    result.className = checkNotNull(className);
-    return result;
-  }
-
-  /**
-   * Constructs a {@code CloudObject} representing the given value.
-   * @param value the scalar value to represent.
-   */
-  public static CloudObject forString(String value) {
-    CloudObject result = forClassName(CloudKnownType.TEXT.getUri());
-    result.put(PropertyNames.SCALAR_FIELD_NAME, value);
-    return result;
-  }
-
-  /**
-   * Constructs a {@code CloudObject} representing the given value.
-   * @param value the scalar value to represent.
-   */
-  public static CloudObject forBoolean(Boolean value) {
-    CloudObject result = forClassName(CloudKnownType.BOOLEAN.getUri());
-    result.put(PropertyNames.SCALAR_FIELD_NAME, value);
-    return result;
-  }
-
-  /**
-   * Constructs a {@code CloudObject} representing the given value.
-   * @param value the scalar value to represent.
-   */
-  public static CloudObject forInteger(Long value) {
-    CloudObject result = forClassName(CloudKnownType.INTEGER.getUri());
-    result.put(PropertyNames.SCALAR_FIELD_NAME, value);
-    return result;
-  }
-
-  /**
-   * Constructs a {@code CloudObject} representing the given value.
-   * @param value the scalar value to represent.
-   */
-  public static CloudObject forInteger(Integer value) {
-    CloudObject result = forClassName(CloudKnownType.INTEGER.getUri());
-    result.put(PropertyNames.SCALAR_FIELD_NAME, value);
-    return result;
-  }
-
-  /**
-   * Constructs a {@code CloudObject} representing the given value.
-   * @param value the scalar value to represent.
-   */
-  public static CloudObject forFloat(Float value) {
-    CloudObject result = forClassName(CloudKnownType.FLOAT.getUri());
-    result.put(PropertyNames.SCALAR_FIELD_NAME, value);
-    return result;
-  }
-
-  /**
-   * Constructs a {@code CloudObject} representing the given value.
-   * @param value the scalar value to represent.
-   */
-  public static CloudObject forFloat(Double value) {
-    CloudObject result = forClassName(CloudKnownType.FLOAT.getUri());
-    result.put(PropertyNames.SCALAR_FIELD_NAME, value);
-    return result;
-  }
-
-  /**
-   * Constructs a {@code CloudObject} representing the given value of a
-   * well-known cloud object type.
-   * @param value the scalar value to represent.
-   * @throws RuntimeException if the value does not have a
-   * {@link CloudKnownType} mapping
-   */
-  public static CloudObject forKnownType(Object value) {
-    @Nullable CloudKnownType ty = CloudKnownType.forClass(value.getClass());
-    if (ty == null) {
-      throw new RuntimeException("Unable to represent value via the Dataflow API: " + value);
-    }
-    CloudObject result = forClassName(ty.getUri());
-    result.put(PropertyNames.SCALAR_FIELD_NAME, value);
-    return result;
-  }
-
-  @Key(PropertyNames.OBJECT_TYPE_NAME)
-  private String className;
-
-  private CloudObject() {}
-
-  /**
-   * Gets the name of the Java class that this CloudObject represents.
-   */
-  public String getClassName() {
-    return className;
-  }
-
-  @Override
-  public CloudObject clone() {
-    return (CloudObject) super.clone();
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/a5627b1a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CoderUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CoderUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CoderUtils.java
index 2d21561..3380a10 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CoderUtils.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CoderUtils.java
@@ -17,19 +17,8 @@
  */
 package org.apache.beam.sdk.util;
 
-import com.fasterxml.jackson.annotation.JsonTypeInfo;
-import com.fasterxml.jackson.annotation.JsonTypeInfo.As;
-import com.fasterxml.jackson.annotation.JsonTypeInfo.Id;
-import com.fasterxml.jackson.databind.DatabindContext;
-import com.fasterxml.jackson.databind.JavaType;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.annotation.JsonTypeIdResolver;
-import com.fasterxml.jackson.databind.jsontype.impl.TypeIdResolverBase;
-import com.fasterxml.jackson.databind.module.SimpleModule;
-import com.fasterxml.jackson.databind.type.TypeFactory;
 import com.google.api.client.util.Base64;
 import com.google.common.base.Throwables;
-import com.google.common.collect.ImmutableMap;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
@@ -37,15 +26,8 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.lang.ref.SoftReference;
 import java.lang.reflect.ParameterizedType;
-import java.lang.reflect.TypeVariable;
-import java.util.Map;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.IterableCoder;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.coders.LengthPrefixCoder;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
-import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.values.TypeDescriptor;
 
 /**
@@ -54,17 +36,6 @@ import org.apache.beam.sdk.values.TypeDescriptor;
 public final class CoderUtils {
   private CoderUtils() {}  // Non-instantiable
 
-  /** A mapping from well known coder types to their implementing classes. */
-  private static final Map<String, Class<?>> WELL_KNOWN_CODER_TYPES =
-      ImmutableMap.<String, Class<?>>builder()
-      .put("kind:pair", KvCoder.class)
-      .put("kind:stream", IterableCoder.class)
-      .put("kind:global_window", GlobalWindow.Coder.class)
-      .put("kind:interval_window", IntervalWindow.IntervalWindowCoder.class)
-      .put("kind:length_prefix", LengthPrefixCoder.class)
-      .put("kind:windowed_value", WindowedValue.FullWindowedValueCoder.class)
-      .build();
-
   private static ThreadLocal<SoftReference<ExposedByteArrayOutputStream>>
       threadLocalOutputStream = new ThreadLocal<>();
 
@@ -214,92 +185,4 @@ public final class CoderUtils {
     TypeDescriptor codedType = TypeDescriptor.of(coderType.getActualTypeArguments()[0]);
     return codedType;
   }
-
-  /**
-   * A {@link com.fasterxml.jackson.databind.Module} that adds the type
-   * resolver needed for Coder definitions.
-   *
-   * <p>Used only in {@link Serializer}, which will move modules
-   */
-  @Deprecated
-  static final class Jackson2Module extends SimpleModule {
-    /**
-     * The Coder custom type resolver.
-     *
-     * <p>This resolver resolves coders. If the Coder ID is a particular
-     * well-known identifier, it's replaced with the corresponding class.
-     * All other Coder instances are resolved by class name, using the package
-     * org.apache.beam.sdk.coders if there are no "."s in the ID.
-     */
-    private static final class Resolver extends TypeIdResolverBase {
-      @SuppressWarnings("unused") // Used via @JsonTypeIdResolver annotation on Mixin
-      public Resolver() {
-        super(TypeFactory.defaultInstance().constructType(Coder.class),
-            TypeFactory.defaultInstance());
-      }
-
-      @Override
-      public JavaType typeFromId(DatabindContext context, String id) {
-        Class<?> clazz = getClassForId(id);
-        @SuppressWarnings("rawtypes")
-        TypeVariable[] tvs = clazz.getTypeParameters();
-        JavaType[] types = new JavaType[tvs.length];
-        for (int lupe = 0; lupe < tvs.length; lupe++) {
-          types[lupe] = TypeFactory.unknownType();
-        }
-        return _typeFactory.constructSimpleType(clazz, types);
-      }
-
-      private Class<?> getClassForId(String id) {
-        try {
-          if (id.contains(".")) {
-            return Class.forName(id);
-          }
-
-          if (WELL_KNOWN_CODER_TYPES.containsKey(id)) {
-            return WELL_KNOWN_CODER_TYPES.get(id);
-          }
-
-          // Otherwise, see if the ID is the name of a class in
-          // org.apache.beam.sdk.coders.  We do this via creating
-          // the class object so that class loaders have a chance to get
-          // involved -- and since we need the class object anyway.
-          return Class.forName(Coder.class.getPackage().getName() + "." + id);
-        } catch (ClassNotFoundException e) {
-          throw new RuntimeException("Unable to convert coder ID " + id + " to class", e);
-        }
-      }
-
-      @Override
-      public String idFromValueAndType(Object o, Class<?> clazz) {
-        return clazz.getName();
-      }
-
-      @Override
-      public String idFromValue(Object o) {
-        return o.getClass().getName();
-      }
-
-      @Override
-      public JsonTypeInfo.Id getMechanism() {
-        return JsonTypeInfo.Id.CUSTOM;
-      }
-    }
-
-    /**
-     * The mixin class defining how Coders are handled by the deserialization
-     * {@link ObjectMapper}.
-     *
-     * <p>This is done via a mixin so that this resolver is <i>only</i> used
-     * during deserialization requested by the Apache Beam SDK.
-     */
-    @JsonTypeIdResolver(Resolver.class)
-    @JsonTypeInfo(use = Id.CUSTOM, include = As.PROPERTY, property = PropertyNames.OBJECT_TYPE_NAME)
-    private static final class Mixin {}
-
-    public Jackson2Module() {
-      super("BeamCoders");
-      setMixInAnnotation(Coder.class, Mixin.class);
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/a5627b1a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PropertyNames.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PropertyNames.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PropertyNames.java
deleted file mode 100644
index aa5855b..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PropertyNames.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-/**
- * Constant property names used by the SDK in CloudWorkflow specifications.
- */
-public class PropertyNames {
-  public static final String ALLOWED_ENCODINGS = "allowed_encodings";
-  public static final String APPEND_TRAILING_NEWLINES = "append_trailing_newlines";
-  public static final String BIGQUERY_CREATE_DISPOSITION = "create_disposition";
-  public static final String BIGQUERY_DATASET = "dataset";
-  public static final String BIGQUERY_PROJECT = "project";
-  public static final String BIGQUERY_SCHEMA = "schema";
-  public static final String BIGQUERY_TABLE = "table";
-  public static final String BIGQUERY_QUERY = "bigquery_query";
-  public static final String BIGQUERY_FLATTEN_RESULTS = "bigquery_flatten_results";
-  public static final String BIGQUERY_USE_LEGACY_SQL = "bigquery_use_legacy_sql";
-  public static final String BIGQUERY_WRITE_DISPOSITION = "write_disposition";
-  public static final String BIGQUERY_EXPORT_FORMAT = "bigquery_export_format";
-  public static final String BIGQUERY_EXPORT_SCHEMA = "bigquery_export_schema";
-  public static final String CO_GBK_RESULT_SCHEMA = "co_gbk_result_schema";
-  public static final String COMBINE_FN = "combine_fn";
-  public static final String COMPONENT_ENCODINGS = "component_encodings";
-  public static final String COMPRESSION_TYPE = "compression_type";
-  public static final String CUSTOM_SOURCE_FORMAT = "custom_source";
-  public static final String CONCAT_SOURCE_SOURCES = "sources";
-  public static final String CONCAT_SOURCE_BASE_SPECS = "base_specs";
-  public static final String SOURCE_STEP_INPUT = "custom_source_step_input";
-  public static final String SOURCE_SPEC = "spec";
-  public static final String SOURCE_METADATA = "metadata";
-  public static final String SOURCE_DOES_NOT_NEED_SPLITTING = "does_not_need_splitting";
-  public static final String SOURCE_PRODUCES_SORTED_KEYS = "produces_sorted_keys";
-  public static final String SOURCE_IS_INFINITE = "is_infinite";
-  public static final String SOURCE_ESTIMATED_SIZE_BYTES = "estimated_size_bytes";
-  public static final String ELEMENT = "element";
-  public static final String ELEMENTS = "elements";
-  public static final String ENCODING = "encoding";
-  public static final String ENCODING_ID = "encoding_id";
-  public static final String END_INDEX = "end_index";
-  public static final String END_OFFSET = "end_offset";
-  public static final String END_SHUFFLE_POSITION = "end_shuffle_position";
-  public static final String ENVIRONMENT_VERSION_JOB_TYPE_KEY = "job_type";
-  public static final String ENVIRONMENT_VERSION_MAJOR_KEY = "major";
-  public static final String FILENAME = "filename";
-  public static final String FILENAME_PREFIX = "filename_prefix";
-  public static final String FILENAME_SUFFIX = "filename_suffix";
-  public static final String FILEPATTERN = "filepattern";
-  public static final String FOOTER = "footer";
-  public static final String FORMAT = "format";
-  public static final String HEADER = "header";
-  public static final String INPUTS = "inputs";
-  public static final String INPUT_CODER = "input_coder";
-  public static final String IS_GENERATED = "is_generated";
-  public static final String IS_MERGING_WINDOW_FN = "is_merging_window_fn";
-  public static final String IS_PAIR_LIKE = "is_pair_like";
-  public static final String IS_STREAM_LIKE = "is_stream_like";
-  public static final String IS_WRAPPER = "is_wrapper";
-  public static final String DISALLOW_COMBINER_LIFTING = "disallow_combiner_lifting";
-  public static final String NON_PARALLEL_INPUTS = "non_parallel_inputs";
-  public static final String NUM_SHARD_CODERS = "num_shard_coders";
-  public static final String NUM_METADATA_SHARD_CODERS = "num_metadata_shard_coders";
-  public static final String NUM_SHARDS = "num_shards";
-  public static final String OBJECT_TYPE_NAME = "@type";
-  public static final String OUTPUT = "output";
-  public static final String OUTPUT_INFO = "output_info";
-  public static final String OUTPUT_NAME = "output_name";
-  public static final String PARALLEL_INPUT = "parallel_input";
-  public static final String PHASE = "phase";
-  public static final String PUBSUB_ID_ATTRIBUTE = "pubsub_id_label";
-  public static final String PUBSUB_SERIALIZED_ATTRIBUTES_FN = "pubsub_serialized_attributes_fn";
-  public static final String PUBSUB_SUBSCRIPTION = "pubsub_subscription";
-  public static final String PUBSUB_SUBSCRIPTION_OVERRIDE = "pubsub_subscription_runtime_override";
-  public static final String PUBSUB_TIMESTAMP_ATTRIBUTE = "pubsub_timestamp_label";
-  public static final String PUBSUB_TOPIC = "pubsub_topic";
-  public static final String PUBSUB_TOPIC_OVERRIDE = "pubsub_topic_runtime_override";
-  public static final String SCALAR_FIELD_NAME = "value";
-  public static final String SERIALIZED_FN = "serialized_fn";
-  public static final String SHARD_NAME_TEMPLATE = "shard_template";
-  public static final String SHUFFLE_KIND = "shuffle_kind";
-  public static final String SHUFFLE_READER_CONFIG = "shuffle_reader_config";
-  public static final String SHUFFLE_WRITER_CONFIG = "shuffle_writer_config";
-  public static final String SORT_VALUES = "sort_values";
-  public static final String START_INDEX = "start_index";
-  public static final String START_OFFSET = "start_offset";
-  public static final String START_SHUFFLE_POSITION = "start_shuffle_position";
-  public static final String STRIP_TRAILING_NEWLINES = "strip_trailing_newlines";
-  public static final String TUPLE_TAGS = "tuple_tags";
-  public static final String USE_INDEXED_FORMAT = "use_indexed_format";
-  public static final String USER_FN = "user_fn";
-  public static final String USER_NAME = "user_name";
-  public static final String USES_KEYED_STATE = "uses_keyed_state";
-  public static final String VALIDATE_SINK = "validate_sink";
-  public static final String VALIDATE_SOURCE = "validate_source";
-  public static final String VALUE = "value";
-  public static final String DISPLAY_DATA = "display_data";
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/a5627b1a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Serializer.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Serializer.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Serializer.java
deleted file mode 100644
index 166e4e7..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Serializer.java
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import javax.annotation.Nullable;
-
-/**
- * Utility for converting objects between Java and Cloud representations.
- *
- * @deprecated replaced by {@code org.apache.beam.runners.dataflow.util.Serializer}
- */
-@Deprecated
-public final class Serializer {
-  // Delay initialization of statics until the first call to Serializer.
-  private static class SingletonHelper {
-    static final ObjectMapper OBJECT_MAPPER = createObjectMapper();
-    static final ObjectMapper TREE_MAPPER = createTreeMapper();
-
-    /**
-     * Creates the object mapper that will be used for serializing Google API
-     * client maps into Jackson trees.
-     */
-    private static ObjectMapper createTreeMapper() {
-      return new ObjectMapper();
-    }
-
-    /**
-     * Creates the object mapper that will be used for deserializing Jackson
-     * trees into objects.
-     */
-    private static ObjectMapper createObjectMapper() {
-      ObjectMapper m = new ObjectMapper();
-      // Ignore properties that are not used by the object.
-      m.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
-
-      // For parameters of type Object, use the @type property to determine the
-      // class to instantiate.
-      //
-      // TODO: It would be ideal to do this for all non-final classes.  The
-      // problem with using DefaultTyping.NON_FINAL is that it insists on having
-      // type information in the JSON for classes with useful default
-      // implementations, such as List.  Ideally, we'd combine these defaults
-      // with available type information if that information's present.
-      m.enableDefaultTypingAsProperty(
-           ObjectMapper.DefaultTyping.JAVA_LANG_OBJECT,
-           PropertyNames.OBJECT_TYPE_NAME);
-
-      m.registerModule(new CoderUtils.Jackson2Module());
-
-      return m;
-    }
-  }
-
-  /**
-   * Deserializes an object from a Dataflow structured encoding (represented in
-   * Java as a map).
-   *
-   * <p>The standard Dataflow SDK object serialization protocol is based on JSON.
-   * Data is typically encoded as a JSON object whose fields represent the
-   * object's data.
-   *
-   * <p>The actual deserialization is performed by Jackson, which can deserialize
-   * public fields, use JavaBean setters, or use injection annotations to
-   * indicate how to construct the object.  The {@link ObjectMapper} used is
-   * configured to use the "@type" field as the name of the class to instantiate
-   * (supporting polymorphic types), and may be further configured by
-   * annotations or via {@link ObjectMapper#registerModule}.
-   *
-   * @see <a href="http://wiki.fasterxml.com/JacksonFAQ#Data_Binding.2C_general">
-   * Jackson Data-Binding</a>
-   * @see <a href="https://github.com/FasterXML/jackson-annotations/wiki/Jackson-Annotations">
-   * Jackson-Annotations</a>
-   * @param serialized the object in untyped decoded form (i.e. a nested {@link Map})
-   * @param clazz the expected object class
-   */
-  public static <T> T deserialize(Map<String, Object> serialized, Class<T> clazz) {
-    try {
-      return SingletonHelper.OBJECT_MAPPER.treeToValue(
-          SingletonHelper.TREE_MAPPER.valueToTree(
-              deserializeCloudKnownTypes(serialized)),
-          clazz);
-    } catch (JsonProcessingException e) {
-      throw new RuntimeException(
-          "Unable to deserialize class " + clazz, e);
-    }
-  }
-
-  /**
-   * Recursively walks the supplied map, looking for well-known cloud type
-   * information (keyed as {@link PropertyNames#OBJECT_TYPE_NAME}, matching a
-   * URI value from the {@link CloudKnownType} enum.  Upon finding this type
-   * information, it converts it into the correspondingly typed Java value.
-   */
-  @SuppressWarnings("unchecked")
-  private static Object deserializeCloudKnownTypes(Object src) {
-    if (src instanceof Map) {
-      Map<String, Object> srcMap = (Map<String, Object>) src;
-      @Nullable Object value = srcMap.get(PropertyNames.SCALAR_FIELD_NAME);
-      @Nullable CloudKnownType type =
-          CloudKnownType.forUri((String) srcMap.get(PropertyNames.OBJECT_TYPE_NAME));
-      if (type != null && value != null) {
-        // It's a value of a well-known cloud type; let the known type handler
-        // handle the translation.
-        Object result = type.parse(value, type.defaultClass());
-        return result;
-      }
-      // Otherwise, it's just an ordinary map.
-      Map<String, Object> dest = new HashMap<>(srcMap.size());
-      for (Map.Entry<String, Object> entry : srcMap.entrySet()) {
-        dest.put(entry.getKey(), deserializeCloudKnownTypes(entry.getValue()));
-      }
-      return dest;
-    }
-    if (src instanceof List) {
-      List<Object> srcList = (List<Object>) src;
-      List<Object> dest = new ArrayList<>(srcList.size());
-      for (Object obj : srcList) {
-        dest.add(deserializeCloudKnownTypes(obj));
-      }
-      return dest;
-    }
-    // Neither a Map nor a List; no translation needed.
-    return src;
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/a5627b1a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Structs.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Structs.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Structs.java
deleted file mode 100644
index a4be054..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Structs.java
+++ /dev/null
@@ -1,371 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import com.google.api.client.util.Data;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import javax.annotation.Nullable;
-
-/**
- * A collection of static methods for manipulating datastructure representations transferred via the
- * Dataflow API.
- */
-public final class Structs {
-  private Structs() {} // Non-instantiable
-
-  public static String getString(Map<String, Object> map, String name) {
-    return getValue(map, name, String.class, "a string");
-  }
-
-  public static String getString(
-      Map<String, Object> map, String name, @Nullable String defaultValue) {
-    return getValue(map, name, String.class, "a string", defaultValue);
-  }
-
-  public static byte[] getBytes(Map<String, Object> map, String name) {
-    @Nullable byte[] result = getBytes(map, name, null);
-    if (result == null) {
-      throw new ParameterNotFoundException(name, map);
-    }
-    return result;
-  }
-
-  @Nullable
-  public static byte[] getBytes(
-      Map<String, Object> map, String name, @Nullable byte[] defaultValue) {
-    @Nullable String jsonString = getString(map, name, null);
-    if (jsonString == null) {
-      return defaultValue;
-    }
-    // TODO: Need to agree on a format for encoding bytes in
-    // a string that can be sent to the backend, over the cloud
-    // map task work API.  base64 encoding seems pretty common.  Switch to it?
-    return StringUtils.jsonStringToByteArray(jsonString);
-  }
-
-  public static Boolean getBoolean(Map<String, Object> map, String name) {
-    return getValue(map, name, Boolean.class, "a boolean");
-  }
-
-  @Nullable
-  public static Boolean getBoolean(
-      Map<String, Object> map, String name, @Nullable Boolean defaultValue) {
-    return getValue(map, name, Boolean.class, "a boolean", defaultValue);
-  }
-
-  public static Long getLong(Map<String, Object> map, String name) {
-    return getValue(map, name, Long.class, "a long");
-  }
-
-  @Nullable
-  public static Long getLong(Map<String, Object> map, String name, @Nullable Long defaultValue) {
-    return getValue(map, name, Long.class, "a long", defaultValue);
-  }
-
-  public static Integer getInt(Map<String, Object> map, String name) {
-    return getValue(map, name, Integer.class, "an int");
-  }
-
-  @Nullable
-  public static Integer getInt(
-      Map<String, Object> map, String name, @Nullable Integer defaultValue) {
-    return getValue(map, name, Integer.class, "an int", defaultValue);
-  }
-
-  @Nullable
-  public static List<String> getStrings(
-      Map<String, Object> map, String name, @Nullable List<String> defaultValue) {
-    @Nullable Object value = map.get(name);
-    if (value == null) {
-      if (map.containsKey(name)) {
-        throw new IncorrectTypeException(name, map, "a string or a list");
-      }
-      return defaultValue;
-    }
-    if (Data.isNull(value)) {
-      // This is a JSON literal null.  When represented as a list of strings,
-      // this is an empty list.
-      return Collections.<String>emptyList();
-    }
-    @Nullable String singletonString = decodeValue(value, String.class);
-    if (singletonString != null) {
-      return Collections.singletonList(singletonString);
-    }
-    if (!(value instanceof List)) {
-      throw new IncorrectTypeException(name, map, "a string or a list");
-    }
-    @SuppressWarnings("unchecked")
-    List<Object> elements = (List<Object>) value;
-    List<String> result = new ArrayList<>(elements.size());
-    for (Object o : elements) {
-      @Nullable String s = decodeValue(o, String.class);
-      if (s == null) {
-        throw new IncorrectTypeException(name, map, "a list of strings");
-      }
-      result.add(s);
-    }
-    return result;
-  }
-
-  public static Map<String, Object> getObject(Map<String, Object> map, String name) {
-    @Nullable Map<String, Object> result = getObject(map, name, null);
-    if (result == null) {
-      throw new ParameterNotFoundException(name, map);
-    }
-    return result;
-  }
-
-  @Nullable
-  public static Map<String, Object> getObject(
-      Map<String, Object> map, String name, @Nullable Map<String, Object> defaultValue) {
-    @Nullable Object value = map.get(name);
-    if (value == null) {
-      if (map.containsKey(name)) {
-        throw new IncorrectTypeException(name, map, "an object");
-      }
-      return defaultValue;
-    }
-    return checkObject(value, map, name);
-  }
-
-  private static Map<String, Object> checkObject(
-      Object value, Map<String, Object> map, String name) {
-    if (Data.isNull(value)) {
-      // This is a JSON literal null.  When represented as an object, this is an
-      // empty map.
-      return Collections.<String, Object>emptyMap();
-    }
-    if (!(value instanceof Map)) {
-      throw new IncorrectTypeException(name, map, "an object (not a map)");
-    }
-    @SuppressWarnings("unchecked")
-    Map<String, Object> mapValue = (Map<String, Object>) value;
-    if (!mapValue.containsKey(PropertyNames.OBJECT_TYPE_NAME)) {
-      throw new IncorrectTypeException(
-          name, map, "an object (no \"" + PropertyNames.OBJECT_TYPE_NAME + "\" field)");
-    }
-    return mapValue;
-  }
-
-  @Nullable
-  public static List<Map<String, Object>> getListOfMaps(
-      Map<String, Object> map, String name, @Nullable List<Map<String, Object>> defaultValue) {
-    @Nullable Object value = map.get(name);
-    if (value == null) {
-      if (map.containsKey(name)) {
-        throw new IncorrectTypeException(name, map, "a list");
-      }
-      return defaultValue;
-    }
-    if (Data.isNull(value)) {
-      // This is a JSON literal null.  When represented as a list,
-      // this is an empty list.
-      return Collections.<Map<String, Object>>emptyList();
-    }
-
-    if (!(value instanceof List)) {
-      throw new IncorrectTypeException(name, map, "a list");
-    }
-
-    List<?> elements = (List<?>) value;
-    for (Object elem : elements) {
-      if (!(elem instanceof Map)) {
-        throw new IncorrectTypeException(name, map, "a list of Map objects");
-      }
-    }
-
-    @SuppressWarnings("unchecked")
-    List<Map<String, Object>> result = (List<Map<String, Object>>) elements;
-    return result;
-  }
-
-  public static Map<String, Object> getDictionary(Map<String, Object> map, String name) {
-    @Nullable Object value = map.get(name);
-    if (value == null) {
-      throw new ParameterNotFoundException(name, map);
-    }
-    if (Data.isNull(value)) {
-      // This is a JSON literal null.  When represented as a dictionary, this is
-      // an empty map.
-      return Collections.<String, Object>emptyMap();
-    }
-    if (!(value instanceof Map)) {
-      throw new IncorrectTypeException(name, map, "a dictionary");
-    }
-    @SuppressWarnings("unchecked")
-    Map<String, Object> result = (Map<String, Object>) value;
-    return result;
-  }
-
-  @Nullable
-  public static Map<String, Object> getDictionary(
-      Map<String, Object> map, String name, @Nullable Map<String, Object> defaultValue) {
-    @Nullable Object value = map.get(name);
-    if (value == null) {
-      if (map.containsKey(name)) {
-        throw new IncorrectTypeException(name, map, "a dictionary");
-      }
-      return defaultValue;
-    }
-    if (Data.isNull(value)) {
-      // This is a JSON literal null.  When represented as a dictionary, this is
-      // an empty map.
-      return Collections.<String, Object>emptyMap();
-    }
-    if (!(value instanceof Map)) {
-      throw new IncorrectTypeException(name, map, "a dictionary");
-    }
-    @SuppressWarnings("unchecked")
-    Map<String, Object> result = (Map<String, Object>) value;
-    return result;
-  }
-
-  // Builder operations.
-
-  public static void addString(Map<String, Object> map, String name, String value) {
-    addObject(map, name, CloudObject.forString(value));
-  }
-
-  public static void addBoolean(Map<String, Object> map, String name, boolean value) {
-    addObject(map, name, CloudObject.forBoolean(value));
-  }
-
-  public static void addLong(Map<String, Object> map, String name, long value) {
-    addObject(map, name, CloudObject.forInteger(value));
-  }
-
-  public static void addObject(Map<String, Object> map, String name, Map<String, Object> value) {
-    map.put(name, value);
-  }
-
-  public static void addNull(Map<String, Object> map, String name) {
-    map.put(name, Data.nullOf(Object.class));
-  }
-
-  public static void addLongs(Map<String, Object> map, String name, long... longs) {
-    List<Map<String, Object>> elements = new ArrayList<>(longs.length);
-    for (Long value : longs) {
-      elements.add(CloudObject.forInteger(value));
-    }
-    map.put(name, elements);
-  }
-
-  public static void addList(
-      Map<String, Object> map, String name, List<? extends Map<String, Object>> elements) {
-    map.put(name, elements);
-  }
-
-  public static void addStringList(Map<String, Object> map, String name, List<String> elements) {
-    ArrayList<CloudObject> objects = new ArrayList<>(elements.size());
-    for (String element : elements) {
-      objects.add(CloudObject.forString(element));
-    }
-    addList(map, name, objects);
-  }
-
-  public static <T extends Map<String, Object>> void addList(
-      Map<String, Object> map, String name, T[] elements) {
-    map.put(name, Arrays.asList(elements));
-  }
-
-  public static void addDictionary(
-      Map<String, Object> map, String name, Map<String, Object> value) {
-    map.put(name, value);
-  }
-
-  public static void addDouble(Map<String, Object> map, String name, Double value) {
-    addObject(map, name, CloudObject.forFloat(value));
-  }
-
-  // Helper methods for a few of the accessor methods.
-
-  private static <T> T getValue(Map<String, Object> map, String name, Class<T> clazz, String type) {
-    @Nullable T result = getValue(map, name, clazz, type, null);
-    if (result == null) {
-      throw new ParameterNotFoundException(name, map);
-    }
-    return result;
-  }
-
-  @Nullable
-  private static <T> T getValue(
-      Map<String, Object> map, String name, Class<T> clazz, String type, @Nullable T defaultValue) {
-    @Nullable Object value = map.get(name);
-    if (value == null) {
-      if (map.containsKey(name)) {
-        throw new IncorrectTypeException(name, map, type);
-      }
-      return defaultValue;
-    }
-    T result = decodeValue(value, clazz);
-    if (result == null) {
-      // The value exists, but can't be decoded.
-      throw new IncorrectTypeException(name, map, type);
-    }
-    return result;
-  }
-
-  @Nullable
-  private static <T> T decodeValue(Object value, Class<T> clazz) {
-    try {
-      if (value.getClass() == clazz) {
-        // decodeValue() is only called for final classes; if the class matches,
-        // it's safe to just return the value, and if it doesn't match, decoding
-        // is needed.
-        return clazz.cast(value);
-      }
-      if (!(value instanceof Map)) {
-        return null;
-      }
-      @SuppressWarnings("unchecked")
-      Map<String, Object> map = (Map<String, Object>) value;
-      @Nullable String typeName = (String) map.get(PropertyNames.OBJECT_TYPE_NAME);
-      if (typeName == null) {
-        return null;
-      }
-      @Nullable CloudKnownType knownType = CloudKnownType.forUri(typeName);
-      if (knownType == null) {
-        return null;
-      }
-      @Nullable Object scalar = map.get(PropertyNames.SCALAR_FIELD_NAME);
-      if (scalar == null) {
-        return null;
-      }
-      return knownType.parse(scalar, clazz);
-    } catch (ClassCastException e) {
-      // If any class cast fails during decoding, the value's not decodable.
-      return null;
-    }
-  }
-
-  private static final class ParameterNotFoundException extends RuntimeException {
-    public ParameterNotFoundException(String name, Map<String, Object> map) {
-      super("didn't find required parameter " + name + " in " + map);
-    }
-  }
-
-  private static final class IncorrectTypeException extends RuntimeException {
-    public IncorrectTypeException(String name, Map<String, Object> map, String type) {
-      super("required parameter " + name + " in " + map + " not " + type);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/a5627b1a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Values.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Values.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Values.java
deleted file mode 100644
index d8aa046..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Values.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import java.util.Map;
-import javax.annotation.Nullable;
-
-/**
- * A collection of static methods for manipulating value representations
- * transfered via the Dataflow API.
- */
-public final class Values {
-  private Values() {}  // Non-instantiable
-
-  public static Boolean asBoolean(Object value) throws ClassCastException {
-    @Nullable Boolean knownResult = checkKnownValue(CloudKnownType.BOOLEAN, value, Boolean.class);
-    if (knownResult != null) {
-      return knownResult;
-    }
-    return Boolean.class.cast(value);
-  }
-
-  public static Double asDouble(Object value) throws ClassCastException {
-    @Nullable Double knownResult = checkKnownValue(CloudKnownType.FLOAT, value, Double.class);
-    if (knownResult != null) {
-      return knownResult;
-    }
-    if (value instanceof Double) {
-      return (Double) value;
-    }
-    return ((Float) value).doubleValue();
-  }
-
-  public static Long asLong(Object value) throws ClassCastException {
-    @Nullable Long knownResult = checkKnownValue(CloudKnownType.INTEGER, value, Long.class);
-    if (knownResult != null) {
-      return knownResult;
-    }
-    if (value instanceof Long) {
-      return (Long) value;
-    }
-    return ((Integer) value).longValue();
-  }
-
-  public static String asString(Object value) throws ClassCastException {
-    @Nullable String knownResult = checkKnownValue(CloudKnownType.TEXT, value, String.class);
-    if (knownResult != null) {
-      return knownResult;
-    }
-    return String.class.cast(value);
-  }
-
-  @Nullable
-  private static <T> T checkKnownValue(CloudKnownType type, Object value, Class<T> clazz) {
-    if (!(value instanceof Map)) {
-      return null;
-    }
-    Map<String, Object> map = (Map<String, Object>) value;
-    @Nullable String typeName = (String) map.get(PropertyNames.OBJECT_TYPE_NAME);
-    if (typeName == null) {
-      return null;
-    }
-    @Nullable CloudKnownType knownType = CloudKnownType.forUri(typeName);
-    if (knownType == null || knownType != type) {
-      return null;
-    }
-    @Nullable Object scalar = map.get(PropertyNames.SCALAR_FIELD_NAME);
-    if (scalar == null) {
-      return null;
-    }
-    return knownType.parse(scalar, clazz);
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/a5627b1a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
index 23666ca..13e499d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
@@ -17,11 +17,8 @@
  */
 package org.apache.beam.sdk.util;
 
-import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.MoreObjects;
 import com.google.common.collect.ImmutableList;
 import java.io.IOException;
@@ -610,17 +607,6 @@ public abstract class WindowedValue<T> {
       return new FullWindowedValueCoder<>(valueCoder, windowCoder);
     }
 
-    @JsonCreator
-    public static FullWindowedValueCoder<?> of(
-        @JsonProperty(PropertyNames.COMPONENT_ENCODINGS)
-        List<Coder<?>> components) {
-      checkArgument(components.size() == 2,
-                    "Expecting 2 components, got " + components.size());
-      @SuppressWarnings("unchecked")
-      Coder<? extends BoundedWindow> window = (Coder<? extends BoundedWindow>) components.get(1);
-      return of(components.get(0), window);
-    }
-
     FullWindowedValueCoder(Coder<T> valueCoder,
                            Coder<? extends BoundedWindow> windowCoder) {
       super(valueCoder);
@@ -717,14 +703,6 @@ public abstract class WindowedValue<T> {
       return new ValueOnlyWindowedValueCoder<>(valueCoder);
     }
 
-    @JsonCreator
-    public static ValueOnlyWindowedValueCoder<?> of(
-        @JsonProperty(PropertyNames.COMPONENT_ENCODINGS)
-        List<Coder<?>> components) {
-      checkArgument(components.size() == 1, "Expecting 1 component, got " + components.size());
-      return of(components.get(0));
-    }
-
     ValueOnlyWindowedValueCoder(Coder<T> valueCoder) {
       super(valueCoder);
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/a5627b1a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TupleTag.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TupleTag.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TupleTag.java
index 37d41f7..c5e04b0 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TupleTag.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TupleTag.java
@@ -17,18 +17,11 @@
  */
 package org.apache.beam.sdk.values;
 
-import static org.apache.beam.sdk.util.Structs.addBoolean;
-import static org.apache.beam.sdk.util.Structs.addString;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.collect.HashMultiset;
 import com.google.common.collect.Multiset;
 import java.io.Serializable;
 import java.util.Random;
 import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.util.CloudObject;
-import org.apache.beam.sdk.util.PropertyNames;
 
 /**
  * A {@link TupleTag} is a typed tag to use as the key of a heterogeneously typed tuple, like {@link
@@ -153,26 +146,11 @@ public class TupleTag<V> implements Serializable {
     return caller + "#" + nonce;
   }
 
-  @JsonCreator
-  @SuppressWarnings("unused")
-  private static TupleTag<?> fromJson(
-      @JsonProperty(PropertyNames.VALUE) String id,
-      @JsonProperty(PropertyNames.IS_GENERATED) boolean generated) {
-    return new TupleTag<>(id, generated);
-  }
-
   private TupleTag(String id, boolean generated) {
     this.id = id;
     this.generated = generated;
   }
 
-  public CloudObject asCloudObject() {
-    CloudObject result = CloudObject.forClass(getClass());
-    addString(result, PropertyNames.VALUE, id);
-    addBoolean(result, PropertyNames.IS_GENERATED, generated);
-    return result;
-  }
-
   @Override
   public boolean equals(Object that) {
     if (that instanceof TupleTag) {

http://git-wip-us.apache.org/repos/asf/beam/blob/a5627b1a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/SerializerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/SerializerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/SerializerTest.java
deleted file mode 100644
index f6bacc4..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/SerializerTest.java
+++ /dev/null
@@ -1,162 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import static org.apache.beam.sdk.util.Structs.addBoolean;
-import static org.apache.beam.sdk.util.Structs.addDouble;
-import static org.apache.beam.sdk.util.Structs.addLong;
-import static org.apache.beam.sdk.util.Structs.addString;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.annotation.JsonTypeInfo;
-import org.hamcrest.Matchers;
-import org.junit.Assert;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests Serializer implementation.
- */
-@RunWith(JUnit4.class)
-public class SerializerTest {
-  /**
-   * A POJO to use for testing serialization.
-   */
-  @JsonTypeInfo(use = JsonTypeInfo.Id.CLASS, include = JsonTypeInfo.As.PROPERTY,
-      property = PropertyNames.OBJECT_TYPE_NAME)
-  public static class TestRecord {
-    // TODO: When we apply property name typing to all non-final classes, the
-    // annotation on this class should be removed.
-    public String name;
-    public boolean ok;
-    public int value;
-    public double dValue;
-  }
-
-  @Test
-  public void testStatefulDeserialization() {
-    CloudObject object = CloudObject.forClass(TestRecord.class);
-
-    addString(object, "name", "foobar");
-    addBoolean(object, "ok", true);
-    addLong(object, "value", 42L);
-    addDouble(object, "dValue", .25);
-
-    TestRecord record = Serializer.deserialize(object, TestRecord.class);
-    Assert.assertEquals("foobar", record.name);
-    Assert.assertEquals(true, record.ok);
-    Assert.assertEquals(42L, record.value);
-    Assert.assertEquals(0.25, record.dValue, 0.0001);
-  }
-
-  private static class InjectedTestRecord {
-    private final String n;
-    private final int v;
-
-    @SuppressWarnings("unused")  // used for JSON serialization
-    public InjectedTestRecord(
-        @JsonProperty("name") String name,
-        @JsonProperty("value") int value) {
-      this.n = name;
-      this.v = value;
-    }
-
-    public String getName() {
-      return n;
-    }
-    public int getValue() {
-      return v;
-    }
-  }
-
-  @Test
-  public void testDeserializationInjection() {
-    CloudObject object = CloudObject.forClass(InjectedTestRecord.class);
-    addString(object, "name", "foobar");
-    addLong(object, "value", 42L);
-
-    InjectedTestRecord record =
-        Serializer.deserialize(object, InjectedTestRecord.class);
-
-    Assert.assertEquals("foobar", record.getName());
-    Assert.assertEquals(42L, record.getValue());
-  }
-
-  private static class FactoryInjectedTestRecord {
-    @JsonCreator
-    public static FactoryInjectedTestRecord of(
-        @JsonProperty("name") String name,
-        @JsonProperty("value") int value) {
-      return new FactoryInjectedTestRecord(name, value);
-    }
-
-    private final String n;
-    private final int v;
-
-    private FactoryInjectedTestRecord(String name, int value) {
-      this.n = name;
-      this.v = value;
-    }
-
-    public String getName() {
-      return n;
-    }
-    public int getValue() {
-      return v;
-    }
-  }
-
-  @Test
-  public void testDeserializationFactoryInjection() {
-    CloudObject object = CloudObject.forClass(FactoryInjectedTestRecord.class);
-    addString(object, "name", "foobar");
-    addLong(object, "value", 42L);
-
-    FactoryInjectedTestRecord record =
-        Serializer.deserialize(object, FactoryInjectedTestRecord.class);
-    Assert.assertEquals("foobar", record.getName());
-    Assert.assertEquals(42L, record.getValue());
-  }
-
-  private static class DerivedTestRecord extends TestRecord {
-    public String derived;
-  }
-
-  @Test
-  public void testSubclassDeserialization() {
-    CloudObject object = CloudObject.forClass(DerivedTestRecord.class);
-
-    addString(object, "name", "foobar");
-    addBoolean(object, "ok", true);
-    addLong(object, "value", 42L);
-    addDouble(object, "dValue", .25);
-    addString(object, "derived", "baz");
-
-    TestRecord result = Serializer.deserialize(object, TestRecord.class);
-    Assert.assertThat(result, Matchers.instanceOf(DerivedTestRecord.class));
-
-    DerivedTestRecord record = (DerivedTestRecord) result;
-    Assert.assertEquals("foobar", record.name);
-    Assert.assertEquals(true, record.ok);
-    Assert.assertEquals(42L, record.value);
-    Assert.assertEquals(0.25, record.dValue, 0.0001);
-    Assert.assertEquals("baz", record.derived);
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/a5627b1a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/StructsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/StructsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/StructsTest.java
deleted file mode 100644
index 91090d1..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/StructsTest.java
+++ /dev/null
@@ -1,206 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import static org.apache.beam.sdk.util.Structs.addBoolean;
-import static org.apache.beam.sdk.util.Structs.addDouble;
-import static org.apache.beam.sdk.util.Structs.addList;
-import static org.apache.beam.sdk.util.Structs.addLong;
-import static org.apache.beam.sdk.util.Structs.addLongs;
-import static org.apache.beam.sdk.util.Structs.addNull;
-import static org.apache.beam.sdk.util.Structs.addString;
-import static org.apache.beam.sdk.util.Structs.addStringList;
-import static org.apache.beam.sdk.util.Structs.getBoolean;
-import static org.apache.beam.sdk.util.Structs.getDictionary;
-import static org.apache.beam.sdk.util.Structs.getInt;
-import static org.apache.beam.sdk.util.Structs.getListOfMaps;
-import static org.apache.beam.sdk.util.Structs.getLong;
-import static org.apache.beam.sdk.util.Structs.getObject;
-import static org.apache.beam.sdk.util.Structs.getString;
-import static org.apache.beam.sdk.util.Structs.getStrings;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import org.hamcrest.Matchers;
-import org.junit.Assert;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests for Structs.
- */
-@RunWith(JUnit4.class)
-public class StructsTest {
-  private List<Map<String, Object>> makeCloudObjects() {
-    List<Map<String, Object>> objects = new ArrayList<>();
-    {
-      CloudObject o = CloudObject.forClassName("string");
-      addString(o, "singletonStringKey", "stringValue");
-      objects.add(o);
-    }
-    {
-      CloudObject o = CloudObject.forClassName("long");
-      addLong(o, "singletonLongKey", 42L);
-      objects.add(o);
-    }
-    return objects;
-  }
-
-  private Map<String, Object> makeCloudDictionary() {
-    Map<String, Object> o = new HashMap<>();
-    addList(o, "emptyKey", Collections.<Map<String, Object>>emptyList());
-    addNull(o, "noStringsKey");
-    addString(o, "singletonStringKey", "stringValue");
-    addStringList(o, "multipleStringsKey", Arrays.asList("hi", "there", "bob"));
-    addLongs(o, "multipleLongsKey", 47L, 1L << 42, -5L);
-    addLong(o, "singletonLongKey", 42L);
-    addDouble(o, "singletonDoubleKey", 3.14);
-    addBoolean(o, "singletonBooleanKey", true);
-    addNull(o, "noObjectsKey");
-    addList(o, "multipleObjectsKey", makeCloudObjects());
-    return o;
-  }
-
-  @Test
-  public void testGetStringParameter() throws Exception {
-    Map<String, Object> o = makeCloudDictionary();
-
-    Assert.assertEquals(
-        "stringValue",
-        getString(o, "singletonStringKey"));
-    Assert.assertEquals(
-        "stringValue",
-        getString(o, "singletonStringKey", "defaultValue"));
-    Assert.assertEquals(
-        "defaultValue",
-        getString(o, "missingKey", "defaultValue"));
-
-    try {
-      getString(o, "missingKey");
-      Assert.fail("should have thrown an exception");
-    } catch (Exception exn) {
-      Assert.assertThat(exn.toString(),
-                        Matchers.containsString(
-                            "didn't find required parameter missingKey"));
-    }
-
-    try {
-      getString(o, "noStringsKey");
-      Assert.fail("should have thrown an exception");
-    } catch (Exception exn) {
-      Assert.assertThat(exn.toString(),
-                        Matchers.containsString("not a string"));
-    }
-
-    Assert.assertThat(getStrings(o, "noStringsKey", null), Matchers.<String>emptyIterable());
-    Assert.assertThat(getObject(o, "noStringsKey").keySet(), Matchers.<String>emptyIterable());
-    Assert.assertThat(getDictionary(o, "noStringsKey").keySet(), Matchers.<String>emptyIterable());
-    Assert.assertThat(getDictionary(o, "noStringsKey", null).keySet(),
-        Matchers.<String>emptyIterable());
-
-    try {
-      getString(o, "multipleStringsKey");
-      Assert.fail("should have thrown an exception");
-    } catch (Exception exn) {
-      Assert.assertThat(exn.toString(),
-                        Matchers.containsString("not a string"));
-    }
-
-    try {
-      getString(o, "emptyKey");
-      Assert.fail("should have thrown an exception");
-    } catch (Exception exn) {
-      Assert.assertThat(exn.toString(),
-                        Matchers.containsString("not a string"));
-    }
-  }
-
-  @Test
-  public void testGetBooleanParameter() throws Exception {
-    Map<String, Object> o = makeCloudDictionary();
-
-    Assert.assertEquals(
-        true,
-        getBoolean(o, "singletonBooleanKey", false));
-    Assert.assertEquals(
-        false,
-        getBoolean(o, "missingKey", false));
-
-    try {
-      getBoolean(o, "emptyKey", false);
-      Assert.fail("should have thrown an exception");
-    } catch (Exception exn) {
-      Assert.assertThat(exn.toString(),
-                        Matchers.containsString("not a boolean"));
-    }
-  }
-
-  @Test
-  public void testGetLongParameter() throws Exception {
-    Map<String, Object> o = makeCloudDictionary();
-
-    Assert.assertEquals(
-        (Long) 42L,
-        getLong(o, "singletonLongKey", 666L));
-    Assert.assertEquals(
-        (Integer) 42,
-        getInt(o, "singletonLongKey", 666));
-    Assert.assertEquals(
-        (Long) 666L,
-        getLong(o, "missingKey", 666L));
-
-    try {
-      getLong(o, "emptyKey", 666L);
-      Assert.fail("should have thrown an exception");
-    } catch (Exception exn) {
-      Assert.assertThat(exn.toString(),
-                        Matchers.containsString("not a long"));
-    }
-    try {
-      getInt(o, "emptyKey", 666);
-      Assert.fail("should have thrown an exception");
-    } catch (Exception exn) {
-      Assert.assertThat(exn.toString(),
-                        Matchers.containsString("not an int"));
-    }
-  }
-
-  @Test
-  public void testGetListOfMaps() throws Exception {
-    Map<String, Object> o = makeCloudDictionary();
-
-    Assert.assertEquals(
-        makeCloudObjects(),
-        getListOfMaps(o, "multipleObjectsKey", null));
-
-    try {
-      getListOfMaps(o, "singletonLongKey", null);
-      Assert.fail("should have thrown an exception");
-    } catch (Exception exn) {
-      Assert.assertThat(exn.toString(),
-                        Matchers.containsString("not a list"));
-    }
-  }
-
-  // TODO: Test builder operations.
-}


[2/3] beam git commit: [BEAM-1871] Move GCP specific serialization CloudObject and supporting translation code to Dataflow runner module

Posted by lc...@apache.org.
[BEAM-1871] Move GCP specific serialization CloudObject and supporting translation code to Dataflow runner module


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a5627b1a
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a5627b1a
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a5627b1a

Branch: refs/heads/master
Commit: a5627b1a64696d7526bc5aeec5a0b51571fb5ef1
Parents: 320f9af
Author: Luke Cwik <lc...@google.com>
Authored: Wed May 3 10:03:45 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Wed May 3 12:46:17 2017 -0700

----------------------------------------------------------------------
 runners/google-cloud-dataflow-java/pom.xml      |   2 +-
 .../dataflow/DataflowPipelineTranslator.java    |  18 +-
 .../beam/runners/dataflow/DataflowRunner.java   |   2 +-
 .../beam/runners/dataflow/ReadTranslator.java   |   8 +-
 .../dataflow/internal/CustomSources.java        |   6 +-
 .../util/AvroCoderCloudObjectTranslator.java    |   2 -
 .../beam/runners/dataflow/util/CloudObject.java |   1 -
 .../runners/dataflow/util/CloudObjectKinds.java |   2 -
 .../dataflow/util/CloudObjectTranslator.java    |   2 -
 .../dataflow/util/CloudObjectTranslators.java   |  23 +-
 .../runners/dataflow/util/CloudObjects.java     |   1 -
 .../CoderCloudObjectTranslatorRegistrar.java    |   1 -
 .../runners/dataflow/util/PropertyNames.java    | 112 ++++++
 .../SerializableCoderCloudObjectTranslator.java |   2 -
 .../beam/runners/dataflow/util/Serializer.java  | 262 -------------
 .../beam/runners/dataflow/util/Structs.java     | 372 +++++++++++++++++++
 .../DataflowPipelineTranslatorTest.java         |  10 +-
 .../runners/dataflow/util/CloudObjectsTest.java |   1 -
 .../beam/runners/dataflow/util/StructsTest.java | 206 ++++++++++
 .../apache/beam/sdk/coders/CollectionCoder.java |  13 -
 .../apache/beam/sdk/coders/IterableCoder.java   |  13 -
 .../org/apache/beam/sdk/coders/KvCoder.java     |  13 -
 .../beam/sdk/coders/LengthPrefixCoder.java      |  13 -
 .../org/apache/beam/sdk/coders/ListCoder.java   |  13 -
 .../org/apache/beam/sdk/coders/SetCoder.java    |  16 -
 .../apache/beam/sdk/util/CloudKnownType.java    | 143 -------
 .../org/apache/beam/sdk/util/CloudObject.java   | 187 ----------
 .../org/apache/beam/sdk/util/CoderUtils.java    | 117 ------
 .../org/apache/beam/sdk/util/PropertyNames.java | 112 ------
 .../org/apache/beam/sdk/util/Serializer.java    | 147 --------
 .../java/org/apache/beam/sdk/util/Structs.java  | 371 ------------------
 .../java/org/apache/beam/sdk/util/Values.java   |  88 -----
 .../org/apache/beam/sdk/util/WindowedValue.java |  22 --
 .../org/apache/beam/sdk/values/TupleTag.java    |  22 --
 .../apache/beam/sdk/util/SerializerTest.java    | 162 --------
 .../org/apache/beam/sdk/util/StructsTest.java   | 206 ----------
 36 files changed, 729 insertions(+), 1962 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/a5627b1a/runners/google-cloud-dataflow-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/pom.xml b/runners/google-cloud-dataflow-java/pom.xml
index bbad156..30ef84d 100644
--- a/runners/google-cloud-dataflow-java/pom.xml
+++ b/runners/google-cloud-dataflow-java/pom.xml
@@ -33,7 +33,7 @@
   <packaging>jar</packaging>
 
   <properties>
-    <dataflow.container_version>beam-master-20170502</dataflow.container_version>
+    <dataflow.container_version>beam-master-20170503</dataflow.container_version>
     <dataflow.fnapi_environment_major_version>1</dataflow.fnapi_environment_major_version>
     <dataflow.legacy_environment_major_version>6</dataflow.legacy_environment_major_version>
   </properties>

http://git-wip-us.apache.org/repos/asf/beam/blob/a5627b1a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
index 28a9c1c..05edd28 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
@@ -21,15 +21,15 @@ import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 import static com.google.common.base.Strings.isNullOrEmpty;
+import static org.apache.beam.runners.dataflow.util.Structs.addBoolean;
+import static org.apache.beam.runners.dataflow.util.Structs.addDictionary;
+import static org.apache.beam.runners.dataflow.util.Structs.addList;
+import static org.apache.beam.runners.dataflow.util.Structs.addLong;
+import static org.apache.beam.runners.dataflow.util.Structs.addObject;
+import static org.apache.beam.runners.dataflow.util.Structs.addString;
+import static org.apache.beam.runners.dataflow.util.Structs.getString;
 import static org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray;
 import static org.apache.beam.sdk.util.StringUtils.byteArrayToJsonString;
-import static org.apache.beam.sdk.util.Structs.addBoolean;
-import static org.apache.beam.sdk.util.Structs.addDictionary;
-import static org.apache.beam.sdk.util.Structs.addList;
-import static org.apache.beam.sdk.util.Structs.addLong;
-import static org.apache.beam.sdk.util.Structs.addObject;
-import static org.apache.beam.sdk.util.Structs.addString;
-import static org.apache.beam.sdk.util.Structs.getString;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
@@ -62,9 +62,11 @@ import org.apache.beam.runners.dataflow.PrimitiveParDoSingleFactory.ParDoSingle;
 import org.apache.beam.runners.dataflow.TransformTranslator.StepTranslationContext;
 import org.apache.beam.runners.dataflow.TransformTranslator.TranslationContext;
 import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
+import org.apache.beam.runners.dataflow.util.CloudObject;
 import org.apache.beam.runners.dataflow.util.CloudObjects;
 import org.apache.beam.runners.dataflow.util.DoFnInfo;
 import org.apache.beam.runners.dataflow.util.OutputReference;
+import org.apache.beam.runners.dataflow.util.PropertyNames;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.Pipeline.PipelineVisitor;
 import org.apache.beam.sdk.coders.Coder;
@@ -86,8 +88,6 @@ import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
 import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.util.AppliedCombineFn;
-import org.apache.beam.sdk.util.CloudObject;
-import org.apache.beam.sdk.util.PropertyNames;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.values.KV;

http://git-wip-us.apache.org/repos/asf/beam/blob/a5627b1a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 6aaa11b..7da1755 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -73,6 +73,7 @@ import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
 import org.apache.beam.runners.dataflow.util.DataflowTemplateJob;
 import org.apache.beam.runners.dataflow.util.DataflowTransport;
 import org.apache.beam.runners.dataflow.util.MonitoringUtil;
+import org.apache.beam.runners.dataflow.util.PropertyNames;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.Pipeline.PipelineVisitor;
 import org.apache.beam.sdk.PipelineResult.State;
@@ -115,7 +116,6 @@ import org.apache.beam.sdk.util.InstanceBuilder;
 import org.apache.beam.sdk.util.MimeTypes;
 import org.apache.beam.sdk.util.NameUtils;
 import org.apache.beam.sdk.util.PathValidator;
-import org.apache.beam.sdk.util.PropertyNames;
 import org.apache.beam.sdk.util.ReleaseInfo;
 import org.apache.beam.sdk.util.Reshuffle;
 import org.apache.beam.sdk.util.ValueWithRecordId;

http://git-wip-us.apache.org/repos/asf/beam/blob/a5627b1a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/ReadTranslator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/ReadTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/ReadTranslator.java
index bc68511..c304c32 100755
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/ReadTranslator.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/ReadTranslator.java
@@ -17,20 +17,20 @@
  */
 package org.apache.beam.runners.dataflow;
 
-import static org.apache.beam.sdk.util.Structs.addBoolean;
-import static org.apache.beam.sdk.util.Structs.addDictionary;
-import static org.apache.beam.sdk.util.Structs.addLong;
+import static org.apache.beam.runners.dataflow.util.Structs.addBoolean;
+import static org.apache.beam.runners.dataflow.util.Structs.addDictionary;
+import static org.apache.beam.runners.dataflow.util.Structs.addLong;
 
 import com.google.api.services.dataflow.model.SourceMetadata;
 import java.util.HashMap;
 import java.util.Map;
 import org.apache.beam.runners.dataflow.internal.CustomSources;
+import org.apache.beam.runners.dataflow.util.PropertyNames;
 import org.apache.beam.sdk.io.FileBasedSource;
 import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.io.Source;
 import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.util.PropertyNames;
 import org.apache.beam.sdk.values.PValue;
 
 /**

http://git-wip-us.apache.org/repos/asf/beam/blob/a5627b1a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/CustomSources.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/CustomSources.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/CustomSources.java
index 778ccf3..0d93566 100755
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/CustomSources.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/CustomSources.java
@@ -19,9 +19,9 @@ package org.apache.beam.runners.dataflow.internal;
 
 import static com.google.api.client.util.Base64.encodeBase64String;
 import static com.google.common.base.Preconditions.checkArgument;
+import static org.apache.beam.runners.dataflow.util.Structs.addString;
+import static org.apache.beam.runners.dataflow.util.Structs.addStringList;
 import static org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray;
-import static org.apache.beam.sdk.util.Structs.addString;
-import static org.apache.beam.sdk.util.Structs.addStringList;
 
 import com.google.api.services.dataflow.model.SourceMetadata;
 import com.google.common.annotations.VisibleForTesting;
@@ -29,11 +29,11 @@ import com.google.protobuf.ByteString;
 import java.util.ArrayList;
 import java.util.List;
 import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
+import org.apache.beam.runners.dataflow.util.CloudObject;
 import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.io.Source;
 import org.apache.beam.sdk.io.UnboundedSource;
 import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.util.CloudObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/beam/blob/a5627b1a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/AvroCoderCloudObjectTranslator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/AvroCoderCloudObjectTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/AvroCoderCloudObjectTranslator.java
index 444849d..c4d807e 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/AvroCoderCloudObjectTranslator.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/AvroCoderCloudObjectTranslator.java
@@ -20,8 +20,6 @@ package org.apache.beam.runners.dataflow.util;
 
 import org.apache.avro.Schema;
 import org.apache.beam.sdk.coders.AvroCoder;
-import org.apache.beam.sdk.util.CloudObject;
-import org.apache.beam.sdk.util.Structs;
 
 /** A {@link CloudObjectTranslator} for {@link AvroCoder}. */
 class AvroCoderCloudObjectTranslator implements CloudObjectTranslator<AvroCoder> {

http://git-wip-us.apache.org/repos/asf/beam/blob/a5627b1a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObject.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObject.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObject.java
index e4dd9be..b3680e9 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObject.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObject.java
@@ -24,7 +24,6 @@ import com.google.api.client.json.GenericJson;
 import com.google.api.client.util.Key;
 import java.util.Map;
 import javax.annotation.Nullable;
-import org.apache.beam.sdk.util.PropertyNames;
 
 /**
  * A representation of an arbitrary Java object to be instantiated by Dataflow

http://git-wip-us.apache.org/repos/asf/beam/blob/a5627b1a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectKinds.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectKinds.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectKinds.java
index 1499f17..403ade2 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectKinds.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectKinds.java
@@ -18,8 +18,6 @@
 
 package org.apache.beam.runners.dataflow.util;
 
-import org.apache.beam.sdk.util.CloudObject;
-
 /**
  * Known kinds of {@link CloudObject}.
  */

http://git-wip-us.apache.org/repos/asf/beam/blob/a5627b1a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectTranslator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectTranslator.java
index 534370f..775495b 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectTranslator.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectTranslator.java
@@ -18,8 +18,6 @@
 
 package org.apache.beam.runners.dataflow.util;
 
-import org.apache.beam.sdk.util.CloudObject;
-
 /**
  * A translator that takes an object and creates a {@link CloudObject} which can be converted back
  * to the original object.

http://git-wip-us.apache.org/repos/asf/beam/blob/a5627b1a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectTranslators.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectTranslators.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectTranslators.java
index f3e3312..012a669 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectTranslators.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectTranslators.java
@@ -38,14 +38,12 @@ import org.apache.beam.sdk.coders.VarLongCoder;
 import org.apache.beam.sdk.transforms.join.CoGbkResult.CoGbkResultCoder;
 import org.apache.beam.sdk.transforms.join.CoGbkResultSchema;
 import org.apache.beam.sdk.transforms.join.UnionCoder;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow.IntervalWindowCoder;
-import org.apache.beam.sdk.util.CloudObject;
 import org.apache.beam.sdk.util.InstanceBuilder;
-import org.apache.beam.sdk.util.PropertyNames;
 import org.apache.beam.sdk.util.SerializableUtils;
 import org.apache.beam.sdk.util.StringUtils;
-import org.apache.beam.sdk.util.Structs;
 import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
 import org.apache.beam.sdk.values.TupleTag;
 
@@ -94,7 +92,9 @@ class CloudObjectTranslators {
 
       @Override
       public KvCoder fromCloudObject(CloudObject object) {
-        return KvCoder.of(getComponents(object));
+        List<Coder<?>> components = getComponents(object);
+        checkArgument(components.size() == 2, "Expecting 2 components, got %s", components.size());
+        return KvCoder.of(components.get(0), components.get(1));
       }
 
       @Override
@@ -125,7 +125,9 @@ class CloudObjectTranslators {
 
       @Override
       public IterableCoder fromCloudObject(CloudObject object) {
-        return IterableCoder.of(getComponents(object));
+        List<Coder<?>> components = getComponents(object);
+        checkArgument(components.size() == 1, "Expecting 1 component, got %s", components.size());
+        return IterableCoder.of(components.get(0));
       }
 
       @Override
@@ -155,7 +157,9 @@ class CloudObjectTranslators {
 
       @Override
       public LengthPrefixCoder fromCloudObject(CloudObject object) {
-        return LengthPrefixCoder.of(getComponents(object));
+        List<Coder<?>> components = getComponents(object);
+        checkArgument(components.size() == 1, "Expecting 1 component, got %s", components.size());
+        return LengthPrefixCoder.of(components.get(0));
       }
 
       @Override
@@ -246,7 +250,12 @@ class CloudObjectTranslators {
 
       @Override
       public FullWindowedValueCoder fromCloudObject(CloudObject object) {
-        return FullWindowedValueCoder.of(getComponents(object));
+        List<Coder<?>> components = getComponents(object);
+        checkArgument(components.size() == 2,
+            "Expecting 2 components, got " + components.size());
+        @SuppressWarnings("unchecked")
+        Coder<? extends BoundedWindow> window = (Coder<? extends BoundedWindow>) components.get(1);
+        return FullWindowedValueCoder.of(components.get(0), window);
       }
 
       @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/a5627b1a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjects.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjects.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjects.java
index 9383c48..42c7012 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjects.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjects.java
@@ -26,7 +26,6 @@ import java.util.Map;
 import java.util.ServiceLoader;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CustomCoder;
-import org.apache.beam.sdk.util.CloudObject;
 
 /** Utilities for converting an object to a {@link CloudObject}. */
 public class CloudObjects {

http://git-wip-us.apache.org/repos/asf/beam/blob/a5627b1a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CoderCloudObjectTranslatorRegistrar.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CoderCloudObjectTranslatorRegistrar.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CoderCloudObjectTranslatorRegistrar.java
index 446eb3b..928e629 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CoderCloudObjectTranslatorRegistrar.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CoderCloudObjectTranslatorRegistrar.java
@@ -22,7 +22,6 @@ import com.google.auto.service.AutoService;
 import java.util.Map;
 import java.util.ServiceLoader;
 import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.util.CloudObject;
 
 /**
  * {@link Coder} authors have the ability to automatically have their {@link Coder} registered with

http://git-wip-us.apache.org/repos/asf/beam/blob/a5627b1a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PropertyNames.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PropertyNames.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PropertyNames.java
new file mode 100644
index 0000000..c8c9903
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PropertyNames.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.util;
+
+/**
+ * Constant property names used by the SDK in CloudWorkflow specifications.
+ */
+public class PropertyNames {
+  public static final String ALLOWED_ENCODINGS = "allowed_encodings";
+  public static final String APPEND_TRAILING_NEWLINES = "append_trailing_newlines";
+  public static final String BIGQUERY_CREATE_DISPOSITION = "create_disposition";
+  public static final String BIGQUERY_DATASET = "dataset";
+  public static final String BIGQUERY_PROJECT = "project";
+  public static final String BIGQUERY_SCHEMA = "schema";
+  public static final String BIGQUERY_TABLE = "table";
+  public static final String BIGQUERY_QUERY = "bigquery_query";
+  public static final String BIGQUERY_FLATTEN_RESULTS = "bigquery_flatten_results";
+  public static final String BIGQUERY_USE_LEGACY_SQL = "bigquery_use_legacy_sql";
+  public static final String BIGQUERY_WRITE_DISPOSITION = "write_disposition";
+  public static final String BIGQUERY_EXPORT_FORMAT = "bigquery_export_format";
+  public static final String BIGQUERY_EXPORT_SCHEMA = "bigquery_export_schema";
+  public static final String CO_GBK_RESULT_SCHEMA = "co_gbk_result_schema";
+  public static final String COMBINE_FN = "combine_fn";
+  public static final String COMPONENT_ENCODINGS = "component_encodings";
+  public static final String COMPRESSION_TYPE = "compression_type";
+  public static final String CUSTOM_SOURCE_FORMAT = "custom_source";
+  public static final String CONCAT_SOURCE_SOURCES = "sources";
+  public static final String CONCAT_SOURCE_BASE_SPECS = "base_specs";
+  public static final String SOURCE_STEP_INPUT = "custom_source_step_input";
+  public static final String SOURCE_SPEC = "spec";
+  public static final String SOURCE_METADATA = "metadata";
+  public static final String SOURCE_DOES_NOT_NEED_SPLITTING = "does_not_need_splitting";
+  public static final String SOURCE_PRODUCES_SORTED_KEYS = "produces_sorted_keys";
+  public static final String SOURCE_IS_INFINITE = "is_infinite";
+  public static final String SOURCE_ESTIMATED_SIZE_BYTES = "estimated_size_bytes";
+  public static final String ELEMENT = "element";
+  public static final String ELEMENTS = "elements";
+  public static final String ENCODING = "encoding";
+  public static final String ENCODING_ID = "encoding_id";
+  public static final String END_INDEX = "end_index";
+  public static final String END_OFFSET = "end_offset";
+  public static final String END_SHUFFLE_POSITION = "end_shuffle_position";
+  public static final String ENVIRONMENT_VERSION_JOB_TYPE_KEY = "job_type";
+  public static final String ENVIRONMENT_VERSION_MAJOR_KEY = "major";
+  public static final String FILENAME = "filename";
+  public static final String FILENAME_PREFIX = "filename_prefix";
+  public static final String FILENAME_SUFFIX = "filename_suffix";
+  public static final String FILEPATTERN = "filepattern";
+  public static final String FOOTER = "footer";
+  public static final String FORMAT = "format";
+  public static final String HEADER = "header";
+  public static final String INPUTS = "inputs";
+  public static final String INPUT_CODER = "input_coder";
+  public static final String IS_GENERATED = "is_generated";
+  public static final String IS_MERGING_WINDOW_FN = "is_merging_window_fn";
+  public static final String IS_PAIR_LIKE = "is_pair_like";
+  public static final String IS_STREAM_LIKE = "is_stream_like";
+  public static final String IS_WRAPPER = "is_wrapper";
+  public static final String DISALLOW_COMBINER_LIFTING = "disallow_combiner_lifting";
+  public static final String NON_PARALLEL_INPUTS = "non_parallel_inputs";
+  public static final String NUM_SHARD_CODERS = "num_shard_coders";
+  public static final String NUM_METADATA_SHARD_CODERS = "num_metadata_shard_coders";
+  public static final String NUM_SHARDS = "num_shards";
+  public static final String OBJECT_TYPE_NAME = "@type";
+  public static final String OUTPUT = "output";
+  public static final String OUTPUT_INFO = "output_info";
+  public static final String OUTPUT_NAME = "output_name";
+  public static final String PARALLEL_INPUT = "parallel_input";
+  public static final String PHASE = "phase";
+  public static final String PUBSUB_ID_ATTRIBUTE = "pubsub_id_label";
+  public static final String PUBSUB_SERIALIZED_ATTRIBUTES_FN = "pubsub_serialized_attributes_fn";
+  public static final String PUBSUB_SUBSCRIPTION = "pubsub_subscription";
+  public static final String PUBSUB_SUBSCRIPTION_OVERRIDE = "pubsub_subscription_runtime_override";
+  public static final String PUBSUB_TIMESTAMP_ATTRIBUTE = "pubsub_timestamp_label";
+  public static final String PUBSUB_TOPIC = "pubsub_topic";
+  public static final String PUBSUB_TOPIC_OVERRIDE = "pubsub_topic_runtime_override";
+  public static final String SCALAR_FIELD_NAME = "value";
+  public static final String SERIALIZED_FN = "serialized_fn";
+  public static final String SHARD_NAME_TEMPLATE = "shard_template";
+  public static final String SHUFFLE_KIND = "shuffle_kind";
+  public static final String SHUFFLE_READER_CONFIG = "shuffle_reader_config";
+  public static final String SHUFFLE_WRITER_CONFIG = "shuffle_writer_config";
+  public static final String SORT_VALUES = "sort_values";
+  public static final String START_INDEX = "start_index";
+  public static final String START_OFFSET = "start_offset";
+  public static final String START_SHUFFLE_POSITION = "start_shuffle_position";
+  public static final String STRIP_TRAILING_NEWLINES = "strip_trailing_newlines";
+  public static final String TUPLE_TAGS = "tuple_tags";
+  public static final String USE_INDEXED_FORMAT = "use_indexed_format";
+  public static final String USER_FN = "user_fn";
+  public static final String USER_NAME = "user_name";
+  public static final String USES_KEYED_STATE = "uses_keyed_state";
+  public static final String VALIDATE_SINK = "validate_sink";
+  public static final String VALIDATE_SOURCE = "validate_source";
+  public static final String VALUE = "value";
+  public static final String DISPLAY_DATA = "display_data";
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/a5627b1a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/SerializableCoderCloudObjectTranslator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/SerializableCoderCloudObjectTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/SerializableCoderCloudObjectTranslator.java
index 67c021c..dcc311e 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/SerializableCoderCloudObjectTranslator.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/SerializableCoderCloudObjectTranslator.java
@@ -22,8 +22,6 @@ import static com.google.common.base.Preconditions.checkArgument;
 
 import java.io.Serializable;
 import org.apache.beam.sdk.coders.SerializableCoder;
-import org.apache.beam.sdk.util.CloudObject;
-import org.apache.beam.sdk.util.Structs;
 
 /** A {@link CloudObjectTranslator} for {@link SerializableCoder}. */
 class SerializableCoderCloudObjectTranslator implements CloudObjectTranslator<SerializableCoder> {

http://git-wip-us.apache.org/repos/asf/beam/blob/a5627b1a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/Serializer.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/Serializer.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/Serializer.java
deleted file mode 100644
index e2bcafe..0000000
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/Serializer.java
+++ /dev/null
@@ -1,262 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.dataflow.util;
-
-import com.fasterxml.jackson.annotation.JsonTypeInfo;
-import com.fasterxml.jackson.annotation.JsonTypeInfo.As;
-import com.fasterxml.jackson.annotation.JsonTypeInfo.Id;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.DatabindContext;
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.JavaType;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.annotation.JsonTypeIdResolver;
-import com.fasterxml.jackson.databind.jsontype.impl.TypeIdResolverBase;
-import com.fasterxml.jackson.databind.module.SimpleModule;
-import com.fasterxml.jackson.databind.type.TypeFactory;
-import com.google.common.collect.ImmutableMap;
-import java.lang.reflect.TypeVariable;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import javax.annotation.Nullable;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.IterableCoder;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.coders.LengthPrefixCoder;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
-import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
-import org.apache.beam.sdk.util.PropertyNames;
-import org.apache.beam.sdk.util.WindowedValue;
-
-/**
- * Utility for converting objects between Java and Cloud representations.
- *
- * @deprecated Will no longer be used once all coders are converted via {@link CloudObjects}.
- */
-@Deprecated
-public final class Serializer {
-  /** A mapping from well known coder types to their implementing classes. */
-  private static final Map<String, Class<?>> WELL_KNOWN_CODER_TYPES =
-      ImmutableMap.<String, Class<?>>builder()
-          .put("kind:pair", KvCoder.class)
-          .put("kind:stream", IterableCoder.class)
-          .put("kind:global_window", GlobalWindow.Coder.class)
-          .put("kind:interval_window", IntervalWindow.IntervalWindowCoder.class)
-          .put("kind:length_prefix", LengthPrefixCoder.class)
-          .put("kind:windowed_value", WindowedValue.FullWindowedValueCoder.class)
-          .build();
-
-  // Delay initialization of statics until the first call to Serializer.
-  private static class SingletonHelper {
-    static final ObjectMapper OBJECT_MAPPER = createObjectMapper();
-    static final ObjectMapper TREE_MAPPER = createTreeMapper();
-
-    /**
-     * Creates the object mapper that will be used for serializing Google API
-     * client maps into Jackson trees.
-     */
-    private static ObjectMapper createTreeMapper() {
-      return new ObjectMapper();
-    }
-
-    /**
-     * Creates the object mapper that will be used for deserializing Jackson
-     * trees into objects.
-     */
-    private static ObjectMapper createObjectMapper() {
-      ObjectMapper m = new ObjectMapper();
-      // Ignore properties that are not used by the object.
-      m.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
-
-      // For parameters of type Object, use the @type property to determine the
-      // class to instantiate.
-      //
-      // TODO: It would be ideal to do this for all non-final classes.  The
-      // problem with using DefaultTyping.NON_FINAL is that it insists on having
-      // type information in the JSON for classes with useful default
-      // implementations, such as List.  Ideally, we'd combine these defaults
-      // with available type information if that information's present.
-      m.enableDefaultTypingAsProperty(
-           ObjectMapper.DefaultTyping.JAVA_LANG_OBJECT,
-           PropertyNames.OBJECT_TYPE_NAME);
-
-      m.registerModule(new Jackson2Module());
-
-      return m;
-    }
-  }
-
-  /**
-   * Deserializes an object from a Dataflow structured encoding (represented in
-   * Java as a map).
-   *
-   * <p>The standard Dataflow SDK object serialization protocol is based on JSON.
-   * Data is typically encoded as a JSON object whose fields represent the
-   * object's data.
-   *
-   * <p>The actual deserialization is performed by Jackson, which can deserialize
-   * public fields, use JavaBean setters, or use injection annotations to
-   * indicate how to construct the object.  The {@link ObjectMapper} used is
-   * configured to use the "@type" field as the name of the class to instantiate
-   * (supporting polymorphic types), and may be further configured by
-   * annotations or via {@link ObjectMapper#registerModule}.
-   *
-   * @see <a href="http://wiki.fasterxml.com/JacksonFAQ#Data_Binding.2C_general">
-   * Jackson Data-Binding</a>
-   * @see <a href="https://github.com/FasterXML/jackson-annotations/wiki/Jackson-Annotations">
-   * Jackson-Annotations</a>
-   * @param serialized the object in untyped decoded form (i.e. a nested {@link Map})
-   * @param clazz the expected object class
-   */
-  public static <T> T deserialize(Map<String, Object> serialized, Class<T> clazz) {
-    try {
-      return SingletonHelper.OBJECT_MAPPER.treeToValue(
-          SingletonHelper.TREE_MAPPER.valueToTree(
-              deserializeCloudKnownTypes(serialized)),
-          clazz);
-    } catch (JsonProcessingException e) {
-      throw new RuntimeException(
-          "Unable to deserialize class " + clazz, e);
-    }
-  }
-
-  /**
-   * Recursively walks the supplied map, looking for well-known cloud type information (keyed as
-   * {@link PropertyNames#OBJECT_TYPE_NAME}, matching a URI value from the {@link CloudKnownType}
-   * enum. Upon finding this type information, it converts it into the correspondingly typed Java
-   * value.
-   */
-  @SuppressWarnings("unchecked")
-  private static Object deserializeCloudKnownTypes(Object src) {
-    if (src instanceof Map) {
-      Map<String, Object> srcMap = (Map<String, Object>) src;
-      @Nullable Object value = srcMap.get(PropertyNames.SCALAR_FIELD_NAME);
-      @Nullable CloudKnownType type =
-          CloudKnownType.forUri((String) srcMap.get(PropertyNames.OBJECT_TYPE_NAME));
-      if (type != null && value != null) {
-        // It's a value of a well-known cloud type; let the known type handler
-        // handle the translation.
-        Object result = type.parse(value, type.defaultClass());
-        return result;
-      }
-      // Otherwise, it's just an ordinary map.
-      Map<String, Object> dest = new HashMap<>(srcMap.size());
-      for (Map.Entry<String, Object> entry : srcMap.entrySet()) {
-        dest.put(entry.getKey(), deserializeCloudKnownTypes(entry.getValue()));
-      }
-      return dest;
-    }
-    if (src instanceof List) {
-      List<Object> srcList = (List<Object>) src;
-      List<Object> dest = new ArrayList<>(srcList.size());
-      for (Object obj : srcList) {
-        dest.add(deserializeCloudKnownTypes(obj));
-      }
-      return dest;
-    }
-    // Neither a Map nor a List; no translation needed.
-    return src;
-  }
-
-  /**
-   * A {@link com.fasterxml.jackson.databind.Module} that adds the type
-   * resolver needed for Coder definitions.
-   */
-  static final class Jackson2Module extends SimpleModule {
-    /**
-     * The Coder custom type resolver.
-     *
-     * <p>This resolver resolves coders. If the Coder ID is a particular
-     * well-known identifier, it's replaced with the corresponding class.
-     * All other Coder instances are resolved by class name, using the package
-     * org.apache.beam.sdk.coders if there are no "."s in the ID.
-     */
-    private static final class Resolver extends TypeIdResolverBase {
-      @SuppressWarnings("unused") // Used via @JsonTypeIdResolver annotation on Mixin
-      public Resolver() {
-        super(TypeFactory.defaultInstance().constructType(Coder.class),
-            TypeFactory.defaultInstance());
-      }
-
-      @Override
-      public JavaType typeFromId(DatabindContext context, String id) {
-        Class<?> clazz = getClassForId(id);
-        @SuppressWarnings("rawtypes")
-        TypeVariable[] tvs = clazz.getTypeParameters();
-        JavaType[] types = new JavaType[tvs.length];
-        for (int lupe = 0; lupe < tvs.length; lupe++) {
-          types[lupe] = TypeFactory.unknownType();
-        }
-        return _typeFactory.constructSimpleType(clazz, types);
-      }
-
-      private Class<?> getClassForId(String id) {
-        try {
-          if (id.contains(".")) {
-            return Class.forName(id);
-          }
-
-          if (WELL_KNOWN_CODER_TYPES.containsKey(id)) {
-            return WELL_KNOWN_CODER_TYPES.get(id);
-          }
-
-          // Otherwise, see if the ID is the name of a class in
-          // org.apache.beam.sdk.coders.  We do this via creating
-          // the class object so that class loaders have a chance to get
-          // involved -- and since we need the class object anyway.
-          return Class.forName(Coder.class.getPackage().getName() + "." + id);
-        } catch (ClassNotFoundException e) {
-          throw new RuntimeException("Unable to convert coder ID " + id + " to class", e);
-        }
-      }
-
-      @Override
-      public String idFromValueAndType(Object o, Class<?> clazz) {
-        return clazz.getName();
-      }
-
-      @Override
-      public String idFromValue(Object o) {
-        return o.getClass().getName();
-      }
-
-      @Override
-      public JsonTypeInfo.Id getMechanism() {
-        return JsonTypeInfo.Id.CUSTOM;
-      }
-    }
-
-    /**
-     * The mixin class defining how Coders are handled by the deserialization
-     * {@link ObjectMapper}.
-     *
-     * <p>This is done via a mixin so that this resolver is <i>only</i> used
-     * during deserialization requested by the Apache Beam SDK.
-     */
-    @JsonTypeIdResolver(Resolver.class)
-    @JsonTypeInfo(use = Id.CUSTOM, include = As.PROPERTY, property = PropertyNames.OBJECT_TYPE_NAME)
-    private static final class Mixin {}
-
-    public Jackson2Module() {
-      super("BeamCoders");
-      setMixInAnnotation(Coder.class, Mixin.class);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/a5627b1a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/Structs.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/Structs.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/Structs.java
new file mode 100644
index 0000000..1929f3b
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/Structs.java
@@ -0,0 +1,372 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.util;
+
+import com.google.api.client.util.Data;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.util.StringUtils;
+
+/**
+ * A collection of static methods for manipulating datastructure representations transferred via the
+ * Dataflow API.
+ */
+public final class Structs {
+  private Structs() {} // Non-instantiable
+
+  public static String getString(Map<String, Object> map, String name) {
+    return getValue(map, name, String.class, "a string");
+  }
+
+  public static String getString(
+      Map<String, Object> map, String name, @Nullable String defaultValue) {
+    return getValue(map, name, String.class, "a string", defaultValue);
+  }
+
+  public static byte[] getBytes(Map<String, Object> map, String name) {
+    @Nullable byte[] result = getBytes(map, name, null);
+    if (result == null) {
+      throw new ParameterNotFoundException(name, map);
+    }
+    return result;
+  }
+
+  @Nullable
+  public static byte[] getBytes(
+      Map<String, Object> map, String name, @Nullable byte[] defaultValue) {
+    @Nullable String jsonString = getString(map, name, null);
+    if (jsonString == null) {
+      return defaultValue;
+    }
+    // TODO: Need to agree on a format for encoding bytes in
+    // a string that can be sent to the backend, over the cloud
+    // map task work API.  base64 encoding seems pretty common.  Switch to it?
+    return StringUtils.jsonStringToByteArray(jsonString);
+  }
+
+  public static Boolean getBoolean(Map<String, Object> map, String name) {
+    return getValue(map, name, Boolean.class, "a boolean");
+  }
+
+  @Nullable
+  public static Boolean getBoolean(
+      Map<String, Object> map, String name, @Nullable Boolean defaultValue) {
+    return getValue(map, name, Boolean.class, "a boolean", defaultValue);
+  }
+
+  public static Long getLong(Map<String, Object> map, String name) {
+    return getValue(map, name, Long.class, "a long");
+  }
+
+  @Nullable
+  public static Long getLong(Map<String, Object> map, String name, @Nullable Long defaultValue) {
+    return getValue(map, name, Long.class, "a long", defaultValue);
+  }
+
+  public static Integer getInt(Map<String, Object> map, String name) {
+    return getValue(map, name, Integer.class, "an int");
+  }
+
+  @Nullable
+  public static Integer getInt(
+      Map<String, Object> map, String name, @Nullable Integer defaultValue) {
+    return getValue(map, name, Integer.class, "an int", defaultValue);
+  }
+
+  @Nullable
+  public static List<String> getStrings(
+      Map<String, Object> map, String name, @Nullable List<String> defaultValue) {
+    @Nullable Object value = map.get(name);
+    if (value == null) {
+      if (map.containsKey(name)) {
+        throw new IncorrectTypeException(name, map, "a string or a list");
+      }
+      return defaultValue;
+    }
+    if (Data.isNull(value)) {
+      // This is a JSON literal null.  When represented as a list of strings,
+      // this is an empty list.
+      return Collections.<String>emptyList();
+    }
+    @Nullable String singletonString = decodeValue(value, String.class);
+    if (singletonString != null) {
+      return Collections.singletonList(singletonString);
+    }
+    if (!(value instanceof List)) {
+      throw new IncorrectTypeException(name, map, "a string or a list");
+    }
+    @SuppressWarnings("unchecked")
+    List<Object> elements = (List<Object>) value;
+    List<String> result = new ArrayList<>(elements.size());
+    for (Object o : elements) {
+      @Nullable String s = decodeValue(o, String.class);
+      if (s == null) {
+        throw new IncorrectTypeException(name, map, "a list of strings");
+      }
+      result.add(s);
+    }
+    return result;
+  }
+
+  public static Map<String, Object> getObject(Map<String, Object> map, String name) {
+    @Nullable Map<String, Object> result = getObject(map, name, null);
+    if (result == null) {
+      throw new ParameterNotFoundException(name, map);
+    }
+    return result;
+  }
+
+  @Nullable
+  public static Map<String, Object> getObject(
+      Map<String, Object> map, String name, @Nullable Map<String, Object> defaultValue) {
+    @Nullable Object value = map.get(name);
+    if (value == null) {
+      if (map.containsKey(name)) {
+        throw new IncorrectTypeException(name, map, "an object");
+      }
+      return defaultValue;
+    }
+    return checkObject(value, map, name);
+  }
+
+  private static Map<String, Object> checkObject(
+      Object value, Map<String, Object> map, String name) {
+    if (Data.isNull(value)) {
+      // This is a JSON literal null.  When represented as an object, this is an
+      // empty map.
+      return Collections.<String, Object>emptyMap();
+    }
+    if (!(value instanceof Map)) {
+      throw new IncorrectTypeException(name, map, "an object (not a map)");
+    }
+    @SuppressWarnings("unchecked")
+    Map<String, Object> mapValue = (Map<String, Object>) value;
+    if (!mapValue.containsKey(PropertyNames.OBJECT_TYPE_NAME)) {
+      throw new IncorrectTypeException(
+          name, map, "an object (no \"" + PropertyNames.OBJECT_TYPE_NAME + "\" field)");
+    }
+    return mapValue;
+  }
+
+  @Nullable
+  public static List<Map<String, Object>> getListOfMaps(
+      Map<String, Object> map, String name, @Nullable List<Map<String, Object>> defaultValue) {
+    @Nullable Object value = map.get(name);
+    if (value == null) {
+      if (map.containsKey(name)) {
+        throw new IncorrectTypeException(name, map, "a list");
+      }
+      return defaultValue;
+    }
+    if (Data.isNull(value)) {
+      // This is a JSON literal null.  When represented as a list,
+      // this is an empty list.
+      return Collections.<Map<String, Object>>emptyList();
+    }
+
+    if (!(value instanceof List)) {
+      throw new IncorrectTypeException(name, map, "a list");
+    }
+
+    List<?> elements = (List<?>) value;
+    for (Object elem : elements) {
+      if (!(elem instanceof Map)) {
+        throw new IncorrectTypeException(name, map, "a list of Map objects");
+      }
+    }
+
+    @SuppressWarnings("unchecked")
+    List<Map<String, Object>> result = (List<Map<String, Object>>) elements;
+    return result;
+  }
+
+  public static Map<String, Object> getDictionary(Map<String, Object> map, String name) {
+    @Nullable Object value = map.get(name);
+    if (value == null) {
+      throw new ParameterNotFoundException(name, map);
+    }
+    if (Data.isNull(value)) {
+      // This is a JSON literal null.  When represented as a dictionary, this is
+      // an empty map.
+      return Collections.<String, Object>emptyMap();
+    }
+    if (!(value instanceof Map)) {
+      throw new IncorrectTypeException(name, map, "a dictionary");
+    }
+    @SuppressWarnings("unchecked")
+    Map<String, Object> result = (Map<String, Object>) value;
+    return result;
+  }
+
+  @Nullable
+  public static Map<String, Object> getDictionary(
+      Map<String, Object> map, String name, @Nullable Map<String, Object> defaultValue) {
+    @Nullable Object value = map.get(name);
+    if (value == null) {
+      if (map.containsKey(name)) {
+        throw new IncorrectTypeException(name, map, "a dictionary");
+      }
+      return defaultValue;
+    }
+    if (Data.isNull(value)) {
+      // This is a JSON literal null.  When represented as a dictionary, this is
+      // an empty map.
+      return Collections.<String, Object>emptyMap();
+    }
+    if (!(value instanceof Map)) {
+      throw new IncorrectTypeException(name, map, "a dictionary");
+    }
+    @SuppressWarnings("unchecked")
+    Map<String, Object> result = (Map<String, Object>) value;
+    return result;
+  }
+
+  // Builder operations.
+
+  public static void addString(Map<String, Object> map, String name, String value) {
+    addObject(map, name, CloudObject.forString(value));
+  }
+
+  public static void addBoolean(Map<String, Object> map, String name, boolean value) {
+    addObject(map, name, CloudObject.forBoolean(value));
+  }
+
+  public static void addLong(Map<String, Object> map, String name, long value) {
+    addObject(map, name, CloudObject.forInteger(value));
+  }
+
+  public static void addObject(Map<String, Object> map, String name, Map<String, Object> value) {
+    map.put(name, value);
+  }
+
+  public static void addNull(Map<String, Object> map, String name) {
+    map.put(name, Data.nullOf(Object.class));
+  }
+
+  public static void addLongs(Map<String, Object> map, String name, long... longs) {
+    List<Map<String, Object>> elements = new ArrayList<>(longs.length);
+    for (Long value : longs) {
+      elements.add(CloudObject.forInteger(value));
+    }
+    map.put(name, elements);
+  }
+
+  public static void addList(
+      Map<String, Object> map, String name, List<? extends Map<String, Object>> elements) {
+    map.put(name, elements);
+  }
+
+  public static void addStringList(Map<String, Object> map, String name, List<String> elements) {
+    ArrayList<CloudObject> objects = new ArrayList<>(elements.size());
+    for (String element : elements) {
+      objects.add(CloudObject.forString(element));
+    }
+    addList(map, name, objects);
+  }
+
+  public static <T extends Map<String, Object>> void addList(
+      Map<String, Object> map, String name, T[] elements) {
+    map.put(name, Arrays.asList(elements));
+  }
+
+  public static void addDictionary(
+      Map<String, Object> map, String name, Map<String, Object> value) {
+    map.put(name, value);
+  }
+
+  public static void addDouble(Map<String, Object> map, String name, Double value) {
+    addObject(map, name, CloudObject.forFloat(value));
+  }
+
+  // Helper methods for a few of the accessor methods.
+
+  private static <T> T getValue(Map<String, Object> map, String name, Class<T> clazz, String type) {
+    @Nullable T result = getValue(map, name, clazz, type, null);
+    if (result == null) {
+      throw new ParameterNotFoundException(name, map);
+    }
+    return result;
+  }
+
+  @Nullable
+  private static <T> T getValue(
+      Map<String, Object> map, String name, Class<T> clazz, String type, @Nullable T defaultValue) {
+    @Nullable Object value = map.get(name);
+    if (value == null) {
+      if (map.containsKey(name)) {
+        throw new IncorrectTypeException(name, map, type);
+      }
+      return defaultValue;
+    }
+    T result = decodeValue(value, clazz);
+    if (result == null) {
+      // The value exists, but can't be decoded.
+      throw new IncorrectTypeException(name, map, type);
+    }
+    return result;
+  }
+
+  @Nullable
+  private static <T> T decodeValue(Object value, Class<T> clazz) {
+    try {
+      if (value.getClass() == clazz) {
+        // decodeValue() is only called for final classes; if the class matches,
+        // it's safe to just return the value, and if it doesn't match, decoding
+        // is needed.
+        return clazz.cast(value);
+      }
+      if (!(value instanceof Map)) {
+        return null;
+      }
+      @SuppressWarnings("unchecked")
+      Map<String, Object> map = (Map<String, Object>) value;
+      @Nullable String typeName = (String) map.get(PropertyNames.OBJECT_TYPE_NAME);
+      if (typeName == null) {
+        return null;
+      }
+      @Nullable CloudKnownType knownType = CloudKnownType.forUri(typeName);
+      if (knownType == null) {
+        return null;
+      }
+      @Nullable Object scalar = map.get(PropertyNames.SCALAR_FIELD_NAME);
+      if (scalar == null) {
+        return null;
+      }
+      return knownType.parse(scalar, clazz);
+    } catch (ClassCastException e) {
+      // If any class cast fails during decoding, the value's not decodable.
+      return null;
+    }
+  }
+
+  private static final class ParameterNotFoundException extends RuntimeException {
+    public ParameterNotFoundException(String name, Map<String, Object> map) {
+      super("didn't find required parameter " + name + " in " + map);
+    }
+  }
+
+  private static final class IncorrectTypeException extends RuntimeException {
+    public IncorrectTypeException(String name, Map<String, Object> map, String type) {
+      super("required parameter " + name + " in " + map + " not " + type);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/a5627b1a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
index 31c47b4..41f3c92 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
@@ -17,9 +17,9 @@
  */
 package org.apache.beam.runners.dataflow;
 
-import static org.apache.beam.sdk.util.Structs.addObject;
-import static org.apache.beam.sdk.util.Structs.getDictionary;
-import static org.apache.beam.sdk.util.Structs.getString;
+import static org.apache.beam.runners.dataflow.util.Structs.addObject;
+import static org.apache.beam.runners.dataflow.util.Structs.getDictionary;
+import static org.apache.beam.runners.dataflow.util.Structs.getString;
 import static org.hamcrest.Matchers.allOf;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
@@ -60,6 +60,8 @@ import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.JobSpecificat
 import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
 import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions;
 import org.apache.beam.runners.dataflow.util.OutputReference;
+import org.apache.beam.runners.dataflow.util.PropertyNames;
+import org.apache.beam.runners.dataflow.util.Structs;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
@@ -80,8 +82,6 @@ import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.util.GcsPathValidator;
 import org.apache.beam.sdk.util.GcsUtil;
-import org.apache.beam.sdk.util.PropertyNames;
-import org.apache.beam.sdk.util.Structs;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.util.gcsfs.GcsPath;
 import org.apache.beam.sdk.util.state.StateSpec;

http://git-wip-us.apache.org/repos/asf/beam/blob/a5627b1a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java
index 2e66d43..64c0dbd 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java
@@ -54,7 +54,6 @@ import org.apache.beam.sdk.transforms.join.CoGbkResultSchema;
 import org.apache.beam.sdk.transforms.join.UnionCoder;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
-import org.apache.beam.sdk.util.CloudObject;
 import org.apache.beam.sdk.util.InstanceBuilder;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.TupleTag;

http://git-wip-us.apache.org/repos/asf/beam/blob/a5627b1a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/StructsTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/StructsTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/StructsTest.java
new file mode 100644
index 0000000..0d2bc9f
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/StructsTest.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.util;
+
+import static org.apache.beam.runners.dataflow.util.Structs.addBoolean;
+import static org.apache.beam.runners.dataflow.util.Structs.addDouble;
+import static org.apache.beam.runners.dataflow.util.Structs.addList;
+import static org.apache.beam.runners.dataflow.util.Structs.addLong;
+import static org.apache.beam.runners.dataflow.util.Structs.addLongs;
+import static org.apache.beam.runners.dataflow.util.Structs.addNull;
+import static org.apache.beam.runners.dataflow.util.Structs.addString;
+import static org.apache.beam.runners.dataflow.util.Structs.addStringList;
+import static org.apache.beam.runners.dataflow.util.Structs.getBoolean;
+import static org.apache.beam.runners.dataflow.util.Structs.getDictionary;
+import static org.apache.beam.runners.dataflow.util.Structs.getInt;
+import static org.apache.beam.runners.dataflow.util.Structs.getListOfMaps;
+import static org.apache.beam.runners.dataflow.util.Structs.getLong;
+import static org.apache.beam.runners.dataflow.util.Structs.getObject;
+import static org.apache.beam.runners.dataflow.util.Structs.getString;
+import static org.apache.beam.runners.dataflow.util.Structs.getStrings;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.hamcrest.Matchers;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for Structs.
+ */
+@RunWith(JUnit4.class)
+public class StructsTest {
+  private List<Map<String, Object>> makeCloudObjects() {
+    List<Map<String, Object>> objects = new ArrayList<>();
+    {
+      CloudObject o = CloudObject.forClassName("string");
+      addString(o, "singletonStringKey", "stringValue");
+      objects.add(o);
+    }
+    {
+      CloudObject o = CloudObject.forClassName("long");
+      addLong(o, "singletonLongKey", 42L);
+      objects.add(o);
+    }
+    return objects;
+  }
+
+  private Map<String, Object> makeCloudDictionary() {
+    Map<String, Object> o = new HashMap<>();
+    addList(o, "emptyKey", Collections.<Map<String, Object>>emptyList());
+    addNull(o, "noStringsKey");
+    addString(o, "singletonStringKey", "stringValue");
+    addStringList(o, "multipleStringsKey", Arrays.asList("hi", "there", "bob"));
+    addLongs(o, "multipleLongsKey", 47L, 1L << 42, -5L);
+    addLong(o, "singletonLongKey", 42L);
+    addDouble(o, "singletonDoubleKey", 3.14);
+    addBoolean(o, "singletonBooleanKey", true);
+    addNull(o, "noObjectsKey");
+    addList(o, "multipleObjectsKey", makeCloudObjects());
+    return o;
+  }
+
+  @Test
+  public void testGetStringParameter() throws Exception {
+    Map<String, Object> o = makeCloudDictionary();
+
+    Assert.assertEquals(
+        "stringValue",
+        getString(o, "singletonStringKey"));
+    Assert.assertEquals(
+        "stringValue",
+        getString(o, "singletonStringKey", "defaultValue"));
+    Assert.assertEquals(
+        "defaultValue",
+        getString(o, "missingKey", "defaultValue"));
+
+    try {
+      getString(o, "missingKey");
+      Assert.fail("should have thrown an exception");
+    } catch (Exception exn) {
+      Assert.assertThat(exn.toString(),
+                        Matchers.containsString(
+                            "didn't find required parameter missingKey"));
+    }
+
+    try {
+      getString(o, "noStringsKey");
+      Assert.fail("should have thrown an exception");
+    } catch (Exception exn) {
+      Assert.assertThat(exn.toString(),
+                        Matchers.containsString("not a string"));
+    }
+
+    Assert.assertThat(getStrings(o, "noStringsKey", null), Matchers.<String>emptyIterable());
+    Assert.assertThat(getObject(o, "noStringsKey").keySet(), Matchers.<String>emptyIterable());
+    Assert.assertThat(getDictionary(o, "noStringsKey").keySet(), Matchers.<String>emptyIterable());
+    Assert.assertThat(getDictionary(o, "noStringsKey", null).keySet(),
+        Matchers.<String>emptyIterable());
+
+    try {
+      getString(o, "multipleStringsKey");
+      Assert.fail("should have thrown an exception");
+    } catch (Exception exn) {
+      Assert.assertThat(exn.toString(),
+                        Matchers.containsString("not a string"));
+    }
+
+    try {
+      getString(o, "emptyKey");
+      Assert.fail("should have thrown an exception");
+    } catch (Exception exn) {
+      Assert.assertThat(exn.toString(),
+                        Matchers.containsString("not a string"));
+    }
+  }
+
+  @Test
+  public void testGetBooleanParameter() throws Exception {
+    Map<String, Object> o = makeCloudDictionary();
+
+    Assert.assertEquals(
+        true,
+        getBoolean(o, "singletonBooleanKey", false));
+    Assert.assertEquals(
+        false,
+        getBoolean(o, "missingKey", false));
+
+    try {
+      getBoolean(o, "emptyKey", false);
+      Assert.fail("should have thrown an exception");
+    } catch (Exception exn) {
+      Assert.assertThat(exn.toString(),
+                        Matchers.containsString("not a boolean"));
+    }
+  }
+
+  @Test
+  public void testGetLongParameter() throws Exception {
+    Map<String, Object> o = makeCloudDictionary();
+
+    Assert.assertEquals(
+        (Long) 42L,
+        getLong(o, "singletonLongKey", 666L));
+    Assert.assertEquals(
+        (Integer) 42,
+        getInt(o, "singletonLongKey", 666));
+    Assert.assertEquals(
+        (Long) 666L,
+        getLong(o, "missingKey", 666L));
+
+    try {
+      getLong(o, "emptyKey", 666L);
+      Assert.fail("should have thrown an exception");
+    } catch (Exception exn) {
+      Assert.assertThat(exn.toString(),
+                        Matchers.containsString("not a long"));
+    }
+    try {
+      getInt(o, "emptyKey", 666);
+      Assert.fail("should have thrown an exception");
+    } catch (Exception exn) {
+      Assert.assertThat(exn.toString(),
+                        Matchers.containsString("not an int"));
+    }
+  }
+
+  @Test
+  public void testGetListOfMaps() throws Exception {
+    Map<String, Object> o = makeCloudDictionary();
+
+    Assert.assertEquals(
+        makeCloudObjects(),
+        getListOfMaps(o, "multipleObjectsKey", null));
+
+    try {
+      getListOfMaps(o, "singletonLongKey", null);
+      Assert.fail("should have thrown an exception");
+    } catch (Exception exn) {
+      Assert.assertThat(exn.toString(),
+                        Matchers.containsString("not a list"));
+    }
+  }
+
+  // TODO: Test builder operations.
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/a5627b1a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CollectionCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CollectionCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CollectionCoder.java
index 3585f3e..523b69b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CollectionCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CollectionCoder.java
@@ -17,13 +17,8 @@
  */
 package org.apache.beam.sdk.coders;
 
-import static com.google.common.base.Preconditions.checkArgument;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
 import java.util.Collection;
 import java.util.List;
-import org.apache.beam.sdk.util.PropertyNames;
 import org.apache.beam.sdk.values.TypeDescriptor;
 import org.apache.beam.sdk.values.TypeParameter;
 
@@ -51,14 +46,6 @@ public class CollectionCoder<T> extends IterableLikeCoder<T, Collection<T>> {
     return decodedElements;
   }
 
-  @JsonCreator
-  public static CollectionCoder<?> of(
-      @JsonProperty(PropertyNames.COMPONENT_ENCODINGS)
-      List<Coder<?>> components) {
-    checkArgument(components.size() == 1, "Expecting 1 component, got %s", components.size());
-    return of(components.get(0));
-  }
-
   /**
    * Returns the first element in this collection if it is non-empty,
    * otherwise returns {@code null}.

http://git-wip-us.apache.org/repos/asf/beam/blob/a5627b1a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableCoder.java
index 2949ddb..02c3d0f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableCoder.java
@@ -17,12 +17,7 @@
  */
 package org.apache.beam.sdk.coders;
 
-import static com.google.common.base.Preconditions.checkArgument;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
 import java.util.List;
-import org.apache.beam.sdk.util.PropertyNames;
 import org.apache.beam.sdk.values.TypeDescriptor;
 import org.apache.beam.sdk.values.TypeParameter;
 
@@ -46,14 +41,6 @@ public class IterableCoder<T> extends IterableLikeCoder<T, Iterable<T>> {
     return decodedElements;
   }
 
-  @JsonCreator
-  public static IterableCoder<?> of(
-      @JsonProperty(PropertyNames.COMPONENT_ENCODINGS)
-      List<Coder<?>> components) {
-    checkArgument(components.size() == 1, "Expecting 1 component, got %s", components.size());
-    return of(components.get(0));
-  }
-
   /**
    * Returns the first element in this iterable if it is non-empty,
    * otherwise returns {@code null}.

http://git-wip-us.apache.org/repos/asf/beam/blob/a5627b1a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java
index b10db3a..8a689f7 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java
@@ -17,16 +17,11 @@
  */
 package org.apache.beam.sdk.coders;
 
-import static com.google.common.base.Preconditions.checkArgument;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.Arrays;
 import java.util.List;
-import org.apache.beam.sdk.util.PropertyNames;
 import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.TypeDescriptor;
@@ -44,14 +39,6 @@ public class KvCoder<K, V> extends StructuredCoder<KV<K, V>> {
     return new KvCoder<>(keyCoder, valueCoder);
   }
 
-  @JsonCreator
-  public static KvCoder<?, ?> of(
-      @JsonProperty(PropertyNames.COMPONENT_ENCODINGS)
-      List<Coder<?>> components) {
-    checkArgument(components.size() == 2, "Expecting 2 components, got %s", components.size());
-    return of(components.get(0), components.get(1));
-  }
-
   public static <K, V> List<Object> getInstanceComponents(
       KV<K, V> exampleValue) {
     return Arrays.asList(

http://git-wip-us.apache.org/repos/asf/beam/blob/a5627b1a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java
index be26531..685e766 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java
@@ -17,11 +17,8 @@
  */
 package org.apache.beam.sdk.coders;
 
-import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.collect.ImmutableList;
 import com.google.common.io.ByteStreams;
 import java.io.ByteArrayOutputStream;
@@ -30,7 +27,6 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.List;
 import javax.annotation.Nullable;
-import org.apache.beam.sdk.util.PropertyNames;
 import org.apache.beam.sdk.util.VarInt;
 
 /**
@@ -48,15 +44,6 @@ public class LengthPrefixCoder<T> extends StructuredCoder<T> {
     return new LengthPrefixCoder<>(valueCoder);
   }
 
-  @JsonCreator
-  public static LengthPrefixCoder<?> of(
-      @JsonProperty(PropertyNames.COMPONENT_ENCODINGS)
-      List<Coder<?>> components) {
-    checkArgument(components.size() == 1,
-                  "Expecting 1 components, got " + components.size());
-    return of(components.get(0));
-  }
-
   /////////////////////////////////////////////////////////////////////////////
 
   private final Coder<T> valueCoder;

http://git-wip-us.apache.org/repos/asf/beam/blob/a5627b1a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ListCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ListCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ListCoder.java
index 6f7a0be..32467d2 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ListCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ListCoder.java
@@ -17,12 +17,7 @@
  */
 package org.apache.beam.sdk.coders;
 
-import static com.google.common.base.Preconditions.checkArgument;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
 import java.util.List;
-import org.apache.beam.sdk.util.PropertyNames;
 import org.apache.beam.sdk.values.TypeDescriptor;
 import org.apache.beam.sdk.values.TypeParameter;
 
@@ -45,14 +40,6 @@ public class ListCoder<T> extends IterableLikeCoder<T, List<T>> {
     return decodedElements;
   }
 
-  @JsonCreator
-  public static ListCoder<?> of(
-      @JsonProperty(PropertyNames.COMPONENT_ENCODINGS)
-      List<Coder<?>> components) {
-    checkArgument(components.size() == 1, "Expecting 1 component, got %s", components.size());
-    return of((Coder<?>) components.get(0));
-  }
-
   /**
    * Returns the first element in this list if it is non-empty,
    * otherwise returns {@code null}.

http://git-wip-us.apache.org/repos/asf/beam/blob/a5627b1a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SetCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SetCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SetCoder.java
index 68ef3dc..da16165 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SetCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SetCoder.java
@@ -17,14 +17,9 @@
  */
 package org.apache.beam.sdk.coders;
 
-import static com.google.common.base.Preconditions.checkArgument;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
-import org.apache.beam.sdk.util.PropertyNames;
 import org.apache.beam.sdk.values.TypeDescriptor;
 import org.apache.beam.sdk.values.TypeParameter;
 
@@ -44,17 +39,6 @@ public class SetCoder<T> extends IterableLikeCoder<T, Set<T>> {
   }
 
   /**
-   * Dynamically typed constructor for JSON deserialization.
-   */
-  @JsonCreator
-  public static SetCoder<?> of(
-      @JsonProperty(PropertyNames.COMPONENT_ENCODINGS)
-      List<Coder<?>> components) {
-    checkArgument(components.size() == 1, "Expecting 1 component, got %s", components.size());
-    return of(components.get(0));
-  }
-
-  /**
    * {@inheritDoc}
    *
    * @throws NonDeterministicException always. Sets are not ordered, but

http://git-wip-us.apache.org/repos/asf/beam/blob/a5627b1a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CloudKnownType.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CloudKnownType.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CloudKnownType.java
deleted file mode 100644
index c9e7427..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CloudKnownType.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import javax.annotation.Nullable;
-
-/**
- * A utility for manipulating well-known cloud types.
- *
- * @deprecated replaced by {@code org.apache.beam.runners.dataflow.CloudKnownType}
- */
-@Deprecated
-enum CloudKnownType {
-  TEXT("http://schema.org/Text", String.class) {
-    @Override
-    public <T> T parse(Object value, Class<T> clazz) {
-      return clazz.cast(value);
-    }
-  },
-  BOOLEAN("http://schema.org/Boolean", Boolean.class) {
-    @Override
-    public <T> T parse(Object value, Class<T> clazz) {
-      return clazz.cast(value);
-    }
-  },
-  INTEGER("http://schema.org/Integer", Long.class, Integer.class) {
-    @Override
-    public <T> T parse(Object value, Class<T> clazz) {
-      Object result = null;
-      if (value.getClass() == clazz) {
-        result = value;
-      } else if (clazz == Long.class) {
-        if (value instanceof Integer) {
-          result = ((Integer) value).longValue();
-        } else if (value instanceof String) {
-          result = Long.valueOf((String) value);
-        }
-      } else if (clazz == Integer.class) {
-        if (value instanceof Long) {
-          result = ((Long) value).intValue();
-        } else if (value instanceof String) {
-          result = Integer.valueOf((String) value);
-        }
-      }
-      return clazz.cast(result);
-    }
-  },
-  FLOAT("http://schema.org/Float", Double.class, Float.class) {
-    @Override
-    public <T> T parse(Object value, Class<T> clazz) {
-      Object result = null;
-      if (value.getClass() == clazz) {
-        result = value;
-      } else if (clazz == Double.class) {
-        if (value instanceof Float) {
-          result = ((Float) value).doubleValue();
-        } else if (value instanceof String) {
-          result = Double.valueOf((String) value);
-        }
-      } else if (clazz == Float.class) {
-        if (value instanceof Double) {
-          result = ((Double) value).floatValue();
-        } else if (value instanceof String) {
-          result = Float.valueOf((String) value);
-        }
-      }
-      return clazz.cast(result);
-    }
-  };
-
-  private final String uri;
-  private final Class<?>[] classes;
-
-  CloudKnownType(String uri, Class<?>... classes) {
-    this.uri = uri;
-    this.classes = classes;
-  }
-
-  public String getUri() {
-    return uri;
-  }
-
-  public abstract <T> T parse(Object value, Class<T> clazz);
-
-  public Class<?> defaultClass() {
-    return classes[0];
-  }
-
-  private static final Map<String, CloudKnownType> typesByUri =
-      Collections.unmodifiableMap(buildTypesByUri());
-
-  private static Map<String, CloudKnownType> buildTypesByUri() {
-    Map<String, CloudKnownType> result = new HashMap<>();
-    for (CloudKnownType ty : CloudKnownType.values()) {
-      result.put(ty.getUri(), ty);
-    }
-    return result;
-  }
-
-  @Nullable
-  public static CloudKnownType forUri(@Nullable String uri) {
-    if (uri == null) {
-      return null;
-    }
-    return typesByUri.get(uri);
-  }
-
-  private static final Map<Class<?>, CloudKnownType> typesByClass =
-  Collections.unmodifiableMap(buildTypesByClass());
-
-  private static Map<Class<?>, CloudKnownType> buildTypesByClass() {
-    Map<Class<?>, CloudKnownType> result = new HashMap<>();
-    for (CloudKnownType ty : CloudKnownType.values()) {
-      for (Class<?> clazz : ty.classes) {
-        result.put(clazz, ty);
-      }
-    }
-    return result;
-  }
-
-  @Nullable
-  public static CloudKnownType forClass(Class<?> clazz) {
-    return typesByClass.get(clazz);
-  }
-}


[3/3] beam git commit: [BEAM-1871] Move GCP specific serialization CloudObject and supporting translation code to Dataflow runner module

Posted by lc...@apache.org.
[BEAM-1871] Move GCP specific serialization CloudObject and supporting translation code to Dataflow runner module

This closes #2862


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/aafa1bba
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/aafa1bba
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/aafa1bba

Branch: refs/heads/master
Commit: aafa1bba984ac9744ef3f21c60a402aee3cec293
Parents: 320f9af a5627b1
Author: Luke Cwik <lc...@google.com>
Authored: Wed May 3 12:47:25 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Wed May 3 12:47:25 2017 -0700

----------------------------------------------------------------------
 runners/google-cloud-dataflow-java/pom.xml      |   2 +-
 .../dataflow/DataflowPipelineTranslator.java    |  18 +-
 .../beam/runners/dataflow/DataflowRunner.java   |   2 +-
 .../beam/runners/dataflow/ReadTranslator.java   |   8 +-
 .../dataflow/internal/CustomSources.java        |   6 +-
 .../util/AvroCoderCloudObjectTranslator.java    |   2 -
 .../beam/runners/dataflow/util/CloudObject.java |   1 -
 .../runners/dataflow/util/CloudObjectKinds.java |   2 -
 .../dataflow/util/CloudObjectTranslator.java    |   2 -
 .../dataflow/util/CloudObjectTranslators.java   |  23 +-
 .../runners/dataflow/util/CloudObjects.java     |   1 -
 .../CoderCloudObjectTranslatorRegistrar.java    |   1 -
 .../runners/dataflow/util/PropertyNames.java    | 112 ++++++
 .../SerializableCoderCloudObjectTranslator.java |   2 -
 .../beam/runners/dataflow/util/Serializer.java  | 262 -------------
 .../beam/runners/dataflow/util/Structs.java     | 372 +++++++++++++++++++
 .../DataflowPipelineTranslatorTest.java         |  10 +-
 .../runners/dataflow/util/CloudObjectsTest.java |   1 -
 .../beam/runners/dataflow/util/StructsTest.java | 206 ++++++++++
 .../apache/beam/sdk/coders/CollectionCoder.java |  13 -
 .../apache/beam/sdk/coders/IterableCoder.java   |  13 -
 .../org/apache/beam/sdk/coders/KvCoder.java     |  13 -
 .../beam/sdk/coders/LengthPrefixCoder.java      |  13 -
 .../org/apache/beam/sdk/coders/ListCoder.java   |  13 -
 .../org/apache/beam/sdk/coders/SetCoder.java    |  16 -
 .../apache/beam/sdk/util/CloudKnownType.java    | 143 -------
 .../org/apache/beam/sdk/util/CloudObject.java   | 187 ----------
 .../org/apache/beam/sdk/util/CoderUtils.java    | 117 ------
 .../org/apache/beam/sdk/util/PropertyNames.java | 112 ------
 .../org/apache/beam/sdk/util/Serializer.java    | 147 --------
 .../java/org/apache/beam/sdk/util/Structs.java  | 371 ------------------
 .../java/org/apache/beam/sdk/util/Values.java   |  88 -----
 .../org/apache/beam/sdk/util/WindowedValue.java |  22 --
 .../org/apache/beam/sdk/values/TupleTag.java    |  22 --
 .../apache/beam/sdk/util/SerializerTest.java    | 162 --------
 .../org/apache/beam/sdk/util/StructsTest.java   | 206 ----------
 36 files changed, 729 insertions(+), 1962 deletions(-)
----------------------------------------------------------------------