You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2017/05/03 19:47:36 UTC
[1/3] beam git commit: [BEAM-1871] Move GCP specific serialization
CloudObject and supporting translation code to Dataflow runner module
Repository: beam
Updated Branches:
refs/heads/master 320f9affb -> aafa1bba9
http://git-wip-us.apache.org/repos/asf/beam/blob/a5627b1a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CloudObject.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CloudObject.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CloudObject.java
deleted file mode 100644
index 061e56a..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CloudObject.java
+++ /dev/null
@@ -1,187 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import com.google.api.client.json.GenericJson;
-import com.google.api.client.util.Key;
-import java.util.Map;
-import javax.annotation.Nullable;
-
-/**
- * A representation of an arbitrary Java object to be instantiated by Dataflow
- * workers.
- *
- * <p>Typically, an object to be written by the SDK to the Dataflow service will
- * implement a method (typically called {@code asCloudObject()}) that returns a
- * {@code CloudObject} to represent the object in the protocol. Once the
- * {@code CloudObject} is constructed, the method should explicitly add
- * additional properties to be presented during deserialization, representing
- * child objects by building additional {@code CloudObject}s.
- *
- * @deprecated replaced by {@code org.apache.beam.runners.dataflow.CloudKnownType}
- */
-@Deprecated
-public final class CloudObject extends GenericJson {
- /**
- * Constructs a {@code CloudObject} by copying the supplied serialized object
- * spec, which must represent an SDK object serialized for transport via the
- * Dataflow API.
- *
- * <p>The most common use of this method is during deserialization on the worker,
- * where it's used as a binding type during instance construction.
- *
- * @param spec supplies the serialized form of the object as a nested map
- * @throws RuntimeException if the supplied map does not represent an SDK object
- */
- public static CloudObject fromSpec(Map<String, Object> spec) {
- CloudObject result = new CloudObject();
- result.putAll(spec);
- if (result.className == null) {
- throw new RuntimeException("Unable to create an SDK object from " + spec
- + ": Object class not specified (missing \""
- + PropertyNames.OBJECT_TYPE_NAME + "\" field)");
- }
- return result;
- }
-
- /**
- * Constructs a {@code CloudObject} to be used for serializing an instance of
- * the supplied class for transport via the Dataflow API. The instance
- * parameters to be serialized must be supplied explicitly after the
- * {@code CloudObject} is created, by using {@link CloudObject#put}.
- *
- * @param cls the class to use when deserializing the object on the worker
- */
- public static CloudObject forClass(Class<?> cls) {
- CloudObject result = new CloudObject();
- result.className = checkNotNull(cls).getName();
- return result;
- }
-
- /**
- * Constructs a {@code CloudObject} to be used for serializing data to be
- * deserialized using the supplied class name the supplied class name for
- * transport via the Dataflow API. The instance parameters to be serialized
- * must be supplied explicitly after the {@code CloudObject} is created, by
- * using {@link CloudObject#put}.
- *
- * @param className the class to use when deserializing the object on the worker
- */
- public static CloudObject forClassName(String className) {
- CloudObject result = new CloudObject();
- result.className = checkNotNull(className);
- return result;
- }
-
- /**
- * Constructs a {@code CloudObject} representing the given value.
- * @param value the scalar value to represent.
- */
- public static CloudObject forString(String value) {
- CloudObject result = forClassName(CloudKnownType.TEXT.getUri());
- result.put(PropertyNames.SCALAR_FIELD_NAME, value);
- return result;
- }
-
- /**
- * Constructs a {@code CloudObject} representing the given value.
- * @param value the scalar value to represent.
- */
- public static CloudObject forBoolean(Boolean value) {
- CloudObject result = forClassName(CloudKnownType.BOOLEAN.getUri());
- result.put(PropertyNames.SCALAR_FIELD_NAME, value);
- return result;
- }
-
- /**
- * Constructs a {@code CloudObject} representing the given value.
- * @param value the scalar value to represent.
- */
- public static CloudObject forInteger(Long value) {
- CloudObject result = forClassName(CloudKnownType.INTEGER.getUri());
- result.put(PropertyNames.SCALAR_FIELD_NAME, value);
- return result;
- }
-
- /**
- * Constructs a {@code CloudObject} representing the given value.
- * @param value the scalar value to represent.
- */
- public static CloudObject forInteger(Integer value) {
- CloudObject result = forClassName(CloudKnownType.INTEGER.getUri());
- result.put(PropertyNames.SCALAR_FIELD_NAME, value);
- return result;
- }
-
- /**
- * Constructs a {@code CloudObject} representing the given value.
- * @param value the scalar value to represent.
- */
- public static CloudObject forFloat(Float value) {
- CloudObject result = forClassName(CloudKnownType.FLOAT.getUri());
- result.put(PropertyNames.SCALAR_FIELD_NAME, value);
- return result;
- }
-
- /**
- * Constructs a {@code CloudObject} representing the given value.
- * @param value the scalar value to represent.
- */
- public static CloudObject forFloat(Double value) {
- CloudObject result = forClassName(CloudKnownType.FLOAT.getUri());
- result.put(PropertyNames.SCALAR_FIELD_NAME, value);
- return result;
- }
-
- /**
- * Constructs a {@code CloudObject} representing the given value of a
- * well-known cloud object type.
- * @param value the scalar value to represent.
- * @throws RuntimeException if the value does not have a
- * {@link CloudKnownType} mapping
- */
- public static CloudObject forKnownType(Object value) {
- @Nullable CloudKnownType ty = CloudKnownType.forClass(value.getClass());
- if (ty == null) {
- throw new RuntimeException("Unable to represent value via the Dataflow API: " + value);
- }
- CloudObject result = forClassName(ty.getUri());
- result.put(PropertyNames.SCALAR_FIELD_NAME, value);
- return result;
- }
-
- @Key(PropertyNames.OBJECT_TYPE_NAME)
- private String className;
-
- private CloudObject() {}
-
- /**
- * Gets the name of the Java class that this CloudObject represents.
- */
- public String getClassName() {
- return className;
- }
-
- @Override
- public CloudObject clone() {
- return (CloudObject) super.clone();
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/a5627b1a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CoderUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CoderUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CoderUtils.java
index 2d21561..3380a10 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CoderUtils.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CoderUtils.java
@@ -17,19 +17,8 @@
*/
package org.apache.beam.sdk.util;
-import com.fasterxml.jackson.annotation.JsonTypeInfo;
-import com.fasterxml.jackson.annotation.JsonTypeInfo.As;
-import com.fasterxml.jackson.annotation.JsonTypeInfo.Id;
-import com.fasterxml.jackson.databind.DatabindContext;
-import com.fasterxml.jackson.databind.JavaType;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.annotation.JsonTypeIdResolver;
-import com.fasterxml.jackson.databind.jsontype.impl.TypeIdResolverBase;
-import com.fasterxml.jackson.databind.module.SimpleModule;
-import com.fasterxml.jackson.databind.type.TypeFactory;
import com.google.api.client.util.Base64;
import com.google.common.base.Throwables;
-import com.google.common.collect.ImmutableMap;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
@@ -37,15 +26,8 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.lang.ref.SoftReference;
import java.lang.reflect.ParameterizedType;
-import java.lang.reflect.TypeVariable;
-import java.util.Map;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.IterableCoder;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.coders.LengthPrefixCoder;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
-import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.values.TypeDescriptor;
/**
@@ -54,17 +36,6 @@ import org.apache.beam.sdk.values.TypeDescriptor;
public final class CoderUtils {
private CoderUtils() {} // Non-instantiable
- /** A mapping from well known coder types to their implementing classes. */
- private static final Map<String, Class<?>> WELL_KNOWN_CODER_TYPES =
- ImmutableMap.<String, Class<?>>builder()
- .put("kind:pair", KvCoder.class)
- .put("kind:stream", IterableCoder.class)
- .put("kind:global_window", GlobalWindow.Coder.class)
- .put("kind:interval_window", IntervalWindow.IntervalWindowCoder.class)
- .put("kind:length_prefix", LengthPrefixCoder.class)
- .put("kind:windowed_value", WindowedValue.FullWindowedValueCoder.class)
- .build();
-
private static ThreadLocal<SoftReference<ExposedByteArrayOutputStream>>
threadLocalOutputStream = new ThreadLocal<>();
@@ -214,92 +185,4 @@ public final class CoderUtils {
TypeDescriptor codedType = TypeDescriptor.of(coderType.getActualTypeArguments()[0]);
return codedType;
}
-
- /**
- * A {@link com.fasterxml.jackson.databind.Module} that adds the type
- * resolver needed for Coder definitions.
- *
- * <p>Used only in {@link Serializer}, which will move modules
- */
- @Deprecated
- static final class Jackson2Module extends SimpleModule {
- /**
- * The Coder custom type resolver.
- *
- * <p>This resolver resolves coders. If the Coder ID is a particular
- * well-known identifier, it's replaced with the corresponding class.
- * All other Coder instances are resolved by class name, using the package
- * org.apache.beam.sdk.coders if there are no "."s in the ID.
- */
- private static final class Resolver extends TypeIdResolverBase {
- @SuppressWarnings("unused") // Used via @JsonTypeIdResolver annotation on Mixin
- public Resolver() {
- super(TypeFactory.defaultInstance().constructType(Coder.class),
- TypeFactory.defaultInstance());
- }
-
- @Override
- public JavaType typeFromId(DatabindContext context, String id) {
- Class<?> clazz = getClassForId(id);
- @SuppressWarnings("rawtypes")
- TypeVariable[] tvs = clazz.getTypeParameters();
- JavaType[] types = new JavaType[tvs.length];
- for (int lupe = 0; lupe < tvs.length; lupe++) {
- types[lupe] = TypeFactory.unknownType();
- }
- return _typeFactory.constructSimpleType(clazz, types);
- }
-
- private Class<?> getClassForId(String id) {
- try {
- if (id.contains(".")) {
- return Class.forName(id);
- }
-
- if (WELL_KNOWN_CODER_TYPES.containsKey(id)) {
- return WELL_KNOWN_CODER_TYPES.get(id);
- }
-
- // Otherwise, see if the ID is the name of a class in
- // org.apache.beam.sdk.coders. We do this via creating
- // the class object so that class loaders have a chance to get
- // involved -- and since we need the class object anyway.
- return Class.forName(Coder.class.getPackage().getName() + "." + id);
- } catch (ClassNotFoundException e) {
- throw new RuntimeException("Unable to convert coder ID " + id + " to class", e);
- }
- }
-
- @Override
- public String idFromValueAndType(Object o, Class<?> clazz) {
- return clazz.getName();
- }
-
- @Override
- public String idFromValue(Object o) {
- return o.getClass().getName();
- }
-
- @Override
- public JsonTypeInfo.Id getMechanism() {
- return JsonTypeInfo.Id.CUSTOM;
- }
- }
-
- /**
- * The mixin class defining how Coders are handled by the deserialization
- * {@link ObjectMapper}.
- *
- * <p>This is done via a mixin so that this resolver is <i>only</i> used
- * during deserialization requested by the Apache Beam SDK.
- */
- @JsonTypeIdResolver(Resolver.class)
- @JsonTypeInfo(use = Id.CUSTOM, include = As.PROPERTY, property = PropertyNames.OBJECT_TYPE_NAME)
- private static final class Mixin {}
-
- public Jackson2Module() {
- super("BeamCoders");
- setMixInAnnotation(Coder.class, Mixin.class);
- }
- }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/a5627b1a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PropertyNames.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PropertyNames.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PropertyNames.java
deleted file mode 100644
index aa5855b..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PropertyNames.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-/**
- * Constant property names used by the SDK in CloudWorkflow specifications.
- */
-public class PropertyNames {
- public static final String ALLOWED_ENCODINGS = "allowed_encodings";
- public static final String APPEND_TRAILING_NEWLINES = "append_trailing_newlines";
- public static final String BIGQUERY_CREATE_DISPOSITION = "create_disposition";
- public static final String BIGQUERY_DATASET = "dataset";
- public static final String BIGQUERY_PROJECT = "project";
- public static final String BIGQUERY_SCHEMA = "schema";
- public static final String BIGQUERY_TABLE = "table";
- public static final String BIGQUERY_QUERY = "bigquery_query";
- public static final String BIGQUERY_FLATTEN_RESULTS = "bigquery_flatten_results";
- public static final String BIGQUERY_USE_LEGACY_SQL = "bigquery_use_legacy_sql";
- public static final String BIGQUERY_WRITE_DISPOSITION = "write_disposition";
- public static final String BIGQUERY_EXPORT_FORMAT = "bigquery_export_format";
- public static final String BIGQUERY_EXPORT_SCHEMA = "bigquery_export_schema";
- public static final String CO_GBK_RESULT_SCHEMA = "co_gbk_result_schema";
- public static final String COMBINE_FN = "combine_fn";
- public static final String COMPONENT_ENCODINGS = "component_encodings";
- public static final String COMPRESSION_TYPE = "compression_type";
- public static final String CUSTOM_SOURCE_FORMAT = "custom_source";
- public static final String CONCAT_SOURCE_SOURCES = "sources";
- public static final String CONCAT_SOURCE_BASE_SPECS = "base_specs";
- public static final String SOURCE_STEP_INPUT = "custom_source_step_input";
- public static final String SOURCE_SPEC = "spec";
- public static final String SOURCE_METADATA = "metadata";
- public static final String SOURCE_DOES_NOT_NEED_SPLITTING = "does_not_need_splitting";
- public static final String SOURCE_PRODUCES_SORTED_KEYS = "produces_sorted_keys";
- public static final String SOURCE_IS_INFINITE = "is_infinite";
- public static final String SOURCE_ESTIMATED_SIZE_BYTES = "estimated_size_bytes";
- public static final String ELEMENT = "element";
- public static final String ELEMENTS = "elements";
- public static final String ENCODING = "encoding";
- public static final String ENCODING_ID = "encoding_id";
- public static final String END_INDEX = "end_index";
- public static final String END_OFFSET = "end_offset";
- public static final String END_SHUFFLE_POSITION = "end_shuffle_position";
- public static final String ENVIRONMENT_VERSION_JOB_TYPE_KEY = "job_type";
- public static final String ENVIRONMENT_VERSION_MAJOR_KEY = "major";
- public static final String FILENAME = "filename";
- public static final String FILENAME_PREFIX = "filename_prefix";
- public static final String FILENAME_SUFFIX = "filename_suffix";
- public static final String FILEPATTERN = "filepattern";
- public static final String FOOTER = "footer";
- public static final String FORMAT = "format";
- public static final String HEADER = "header";
- public static final String INPUTS = "inputs";
- public static final String INPUT_CODER = "input_coder";
- public static final String IS_GENERATED = "is_generated";
- public static final String IS_MERGING_WINDOW_FN = "is_merging_window_fn";
- public static final String IS_PAIR_LIKE = "is_pair_like";
- public static final String IS_STREAM_LIKE = "is_stream_like";
- public static final String IS_WRAPPER = "is_wrapper";
- public static final String DISALLOW_COMBINER_LIFTING = "disallow_combiner_lifting";
- public static final String NON_PARALLEL_INPUTS = "non_parallel_inputs";
- public static final String NUM_SHARD_CODERS = "num_shard_coders";
- public static final String NUM_METADATA_SHARD_CODERS = "num_metadata_shard_coders";
- public static final String NUM_SHARDS = "num_shards";
- public static final String OBJECT_TYPE_NAME = "@type";
- public static final String OUTPUT = "output";
- public static final String OUTPUT_INFO = "output_info";
- public static final String OUTPUT_NAME = "output_name";
- public static final String PARALLEL_INPUT = "parallel_input";
- public static final String PHASE = "phase";
- public static final String PUBSUB_ID_ATTRIBUTE = "pubsub_id_label";
- public static final String PUBSUB_SERIALIZED_ATTRIBUTES_FN = "pubsub_serialized_attributes_fn";
- public static final String PUBSUB_SUBSCRIPTION = "pubsub_subscription";
- public static final String PUBSUB_SUBSCRIPTION_OVERRIDE = "pubsub_subscription_runtime_override";
- public static final String PUBSUB_TIMESTAMP_ATTRIBUTE = "pubsub_timestamp_label";
- public static final String PUBSUB_TOPIC = "pubsub_topic";
- public static final String PUBSUB_TOPIC_OVERRIDE = "pubsub_topic_runtime_override";
- public static final String SCALAR_FIELD_NAME = "value";
- public static final String SERIALIZED_FN = "serialized_fn";
- public static final String SHARD_NAME_TEMPLATE = "shard_template";
- public static final String SHUFFLE_KIND = "shuffle_kind";
- public static final String SHUFFLE_READER_CONFIG = "shuffle_reader_config";
- public static final String SHUFFLE_WRITER_CONFIG = "shuffle_writer_config";
- public static final String SORT_VALUES = "sort_values";
- public static final String START_INDEX = "start_index";
- public static final String START_OFFSET = "start_offset";
- public static final String START_SHUFFLE_POSITION = "start_shuffle_position";
- public static final String STRIP_TRAILING_NEWLINES = "strip_trailing_newlines";
- public static final String TUPLE_TAGS = "tuple_tags";
- public static final String USE_INDEXED_FORMAT = "use_indexed_format";
- public static final String USER_FN = "user_fn";
- public static final String USER_NAME = "user_name";
- public static final String USES_KEYED_STATE = "uses_keyed_state";
- public static final String VALIDATE_SINK = "validate_sink";
- public static final String VALIDATE_SOURCE = "validate_source";
- public static final String VALUE = "value";
- public static final String DISPLAY_DATA = "display_data";
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/a5627b1a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Serializer.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Serializer.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Serializer.java
deleted file mode 100644
index 166e4e7..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Serializer.java
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import javax.annotation.Nullable;
-
-/**
- * Utility for converting objects between Java and Cloud representations.
- *
- * @deprecated replaced by {@code org.apache.beam.runners.dataflow.util.Serializer}
- */
-@Deprecated
-public final class Serializer {
- // Delay initialization of statics until the first call to Serializer.
- private static class SingletonHelper {
- static final ObjectMapper OBJECT_MAPPER = createObjectMapper();
- static final ObjectMapper TREE_MAPPER = createTreeMapper();
-
- /**
- * Creates the object mapper that will be used for serializing Google API
- * client maps into Jackson trees.
- */
- private static ObjectMapper createTreeMapper() {
- return new ObjectMapper();
- }
-
- /**
- * Creates the object mapper that will be used for deserializing Jackson
- * trees into objects.
- */
- private static ObjectMapper createObjectMapper() {
- ObjectMapper m = new ObjectMapper();
- // Ignore properties that are not used by the object.
- m.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
-
- // For parameters of type Object, use the @type property to determine the
- // class to instantiate.
- //
- // TODO: It would be ideal to do this for all non-final classes. The
- // problem with using DefaultTyping.NON_FINAL is that it insists on having
- // type information in the JSON for classes with useful default
- // implementations, such as List. Ideally, we'd combine these defaults
- // with available type information if that information's present.
- m.enableDefaultTypingAsProperty(
- ObjectMapper.DefaultTyping.JAVA_LANG_OBJECT,
- PropertyNames.OBJECT_TYPE_NAME);
-
- m.registerModule(new CoderUtils.Jackson2Module());
-
- return m;
- }
- }
-
- /**
- * Deserializes an object from a Dataflow structured encoding (represented in
- * Java as a map).
- *
- * <p>The standard Dataflow SDK object serialization protocol is based on JSON.
- * Data is typically encoded as a JSON object whose fields represent the
- * object's data.
- *
- * <p>The actual deserialization is performed by Jackson, which can deserialize
- * public fields, use JavaBean setters, or use injection annotations to
- * indicate how to construct the object. The {@link ObjectMapper} used is
- * configured to use the "@type" field as the name of the class to instantiate
- * (supporting polymorphic types), and may be further configured by
- * annotations or via {@link ObjectMapper#registerModule}.
- *
- * @see <a href="http://wiki.fasterxml.com/JacksonFAQ#Data_Binding.2C_general">
- * Jackson Data-Binding</a>
- * @see <a href="https://github.com/FasterXML/jackson-annotations/wiki/Jackson-Annotations">
- * Jackson-Annotations</a>
- * @param serialized the object in untyped decoded form (i.e. a nested {@link Map})
- * @param clazz the expected object class
- */
- public static <T> T deserialize(Map<String, Object> serialized, Class<T> clazz) {
- try {
- return SingletonHelper.OBJECT_MAPPER.treeToValue(
- SingletonHelper.TREE_MAPPER.valueToTree(
- deserializeCloudKnownTypes(serialized)),
- clazz);
- } catch (JsonProcessingException e) {
- throw new RuntimeException(
- "Unable to deserialize class " + clazz, e);
- }
- }
-
- /**
- * Recursively walks the supplied map, looking for well-known cloud type
- * information (keyed as {@link PropertyNames#OBJECT_TYPE_NAME}, matching a
- * URI value from the {@link CloudKnownType} enum. Upon finding this type
- * information, it converts it into the correspondingly typed Java value.
- */
- @SuppressWarnings("unchecked")
- private static Object deserializeCloudKnownTypes(Object src) {
- if (src instanceof Map) {
- Map<String, Object> srcMap = (Map<String, Object>) src;
- @Nullable Object value = srcMap.get(PropertyNames.SCALAR_FIELD_NAME);
- @Nullable CloudKnownType type =
- CloudKnownType.forUri((String) srcMap.get(PropertyNames.OBJECT_TYPE_NAME));
- if (type != null && value != null) {
- // It's a value of a well-known cloud type; let the known type handler
- // handle the translation.
- Object result = type.parse(value, type.defaultClass());
- return result;
- }
- // Otherwise, it's just an ordinary map.
- Map<String, Object> dest = new HashMap<>(srcMap.size());
- for (Map.Entry<String, Object> entry : srcMap.entrySet()) {
- dest.put(entry.getKey(), deserializeCloudKnownTypes(entry.getValue()));
- }
- return dest;
- }
- if (src instanceof List) {
- List<Object> srcList = (List<Object>) src;
- List<Object> dest = new ArrayList<>(srcList.size());
- for (Object obj : srcList) {
- dest.add(deserializeCloudKnownTypes(obj));
- }
- return dest;
- }
- // Neither a Map nor a List; no translation needed.
- return src;
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/a5627b1a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Structs.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Structs.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Structs.java
deleted file mode 100644
index a4be054..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Structs.java
+++ /dev/null
@@ -1,371 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import com.google.api.client.util.Data;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import javax.annotation.Nullable;
-
-/**
- * A collection of static methods for manipulating datastructure representations transferred via the
- * Dataflow API.
- */
-public final class Structs {
- private Structs() {} // Non-instantiable
-
- public static String getString(Map<String, Object> map, String name) {
- return getValue(map, name, String.class, "a string");
- }
-
- public static String getString(
- Map<String, Object> map, String name, @Nullable String defaultValue) {
- return getValue(map, name, String.class, "a string", defaultValue);
- }
-
- public static byte[] getBytes(Map<String, Object> map, String name) {
- @Nullable byte[] result = getBytes(map, name, null);
- if (result == null) {
- throw new ParameterNotFoundException(name, map);
- }
- return result;
- }
-
- @Nullable
- public static byte[] getBytes(
- Map<String, Object> map, String name, @Nullable byte[] defaultValue) {
- @Nullable String jsonString = getString(map, name, null);
- if (jsonString == null) {
- return defaultValue;
- }
- // TODO: Need to agree on a format for encoding bytes in
- // a string that can be sent to the backend, over the cloud
- // map task work API. base64 encoding seems pretty common. Switch to it?
- return StringUtils.jsonStringToByteArray(jsonString);
- }
-
- public static Boolean getBoolean(Map<String, Object> map, String name) {
- return getValue(map, name, Boolean.class, "a boolean");
- }
-
- @Nullable
- public static Boolean getBoolean(
- Map<String, Object> map, String name, @Nullable Boolean defaultValue) {
- return getValue(map, name, Boolean.class, "a boolean", defaultValue);
- }
-
- public static Long getLong(Map<String, Object> map, String name) {
- return getValue(map, name, Long.class, "a long");
- }
-
- @Nullable
- public static Long getLong(Map<String, Object> map, String name, @Nullable Long defaultValue) {
- return getValue(map, name, Long.class, "a long", defaultValue);
- }
-
- public static Integer getInt(Map<String, Object> map, String name) {
- return getValue(map, name, Integer.class, "an int");
- }
-
- @Nullable
- public static Integer getInt(
- Map<String, Object> map, String name, @Nullable Integer defaultValue) {
- return getValue(map, name, Integer.class, "an int", defaultValue);
- }
-
- @Nullable
- public static List<String> getStrings(
- Map<String, Object> map, String name, @Nullable List<String> defaultValue) {
- @Nullable Object value = map.get(name);
- if (value == null) {
- if (map.containsKey(name)) {
- throw new IncorrectTypeException(name, map, "a string or a list");
- }
- return defaultValue;
- }
- if (Data.isNull(value)) {
- // This is a JSON literal null. When represented as a list of strings,
- // this is an empty list.
- return Collections.<String>emptyList();
- }
- @Nullable String singletonString = decodeValue(value, String.class);
- if (singletonString != null) {
- return Collections.singletonList(singletonString);
- }
- if (!(value instanceof List)) {
- throw new IncorrectTypeException(name, map, "a string or a list");
- }
- @SuppressWarnings("unchecked")
- List<Object> elements = (List<Object>) value;
- List<String> result = new ArrayList<>(elements.size());
- for (Object o : elements) {
- @Nullable String s = decodeValue(o, String.class);
- if (s == null) {
- throw new IncorrectTypeException(name, map, "a list of strings");
- }
- result.add(s);
- }
- return result;
- }
-
- public static Map<String, Object> getObject(Map<String, Object> map, String name) {
- @Nullable Map<String, Object> result = getObject(map, name, null);
- if (result == null) {
- throw new ParameterNotFoundException(name, map);
- }
- return result;
- }
-
- @Nullable
- public static Map<String, Object> getObject(
- Map<String, Object> map, String name, @Nullable Map<String, Object> defaultValue) {
- @Nullable Object value = map.get(name);
- if (value == null) {
- if (map.containsKey(name)) {
- throw new IncorrectTypeException(name, map, "an object");
- }
- return defaultValue;
- }
- return checkObject(value, map, name);
- }
-
- private static Map<String, Object> checkObject(
- Object value, Map<String, Object> map, String name) {
- if (Data.isNull(value)) {
- // This is a JSON literal null. When represented as an object, this is an
- // empty map.
- return Collections.<String, Object>emptyMap();
- }
- if (!(value instanceof Map)) {
- throw new IncorrectTypeException(name, map, "an object (not a map)");
- }
- @SuppressWarnings("unchecked")
- Map<String, Object> mapValue = (Map<String, Object>) value;
- if (!mapValue.containsKey(PropertyNames.OBJECT_TYPE_NAME)) {
- throw new IncorrectTypeException(
- name, map, "an object (no \"" + PropertyNames.OBJECT_TYPE_NAME + "\" field)");
- }
- return mapValue;
- }
-
- @Nullable
- public static List<Map<String, Object>> getListOfMaps(
- Map<String, Object> map, String name, @Nullable List<Map<String, Object>> defaultValue) {
- @Nullable Object value = map.get(name);
- if (value == null) {
- if (map.containsKey(name)) {
- throw new IncorrectTypeException(name, map, "a list");
- }
- return defaultValue;
- }
- if (Data.isNull(value)) {
- // This is a JSON literal null. When represented as a list,
- // this is an empty list.
- return Collections.<Map<String, Object>>emptyList();
- }
-
- if (!(value instanceof List)) {
- throw new IncorrectTypeException(name, map, "a list");
- }
-
- List<?> elements = (List<?>) value;
- for (Object elem : elements) {
- if (!(elem instanceof Map)) {
- throw new IncorrectTypeException(name, map, "a list of Map objects");
- }
- }
-
- @SuppressWarnings("unchecked")
- List<Map<String, Object>> result = (List<Map<String, Object>>) elements;
- return result;
- }
-
- public static Map<String, Object> getDictionary(Map<String, Object> map, String name) {
- @Nullable Object value = map.get(name);
- if (value == null) {
- throw new ParameterNotFoundException(name, map);
- }
- if (Data.isNull(value)) {
- // This is a JSON literal null. When represented as a dictionary, this is
- // an empty map.
- return Collections.<String, Object>emptyMap();
- }
- if (!(value instanceof Map)) {
- throw new IncorrectTypeException(name, map, "a dictionary");
- }
- @SuppressWarnings("unchecked")
- Map<String, Object> result = (Map<String, Object>) value;
- return result;
- }
-
- @Nullable
- public static Map<String, Object> getDictionary(
- Map<String, Object> map, String name, @Nullable Map<String, Object> defaultValue) {
- @Nullable Object value = map.get(name);
- if (value == null) {
- if (map.containsKey(name)) {
- throw new IncorrectTypeException(name, map, "a dictionary");
- }
- return defaultValue;
- }
- if (Data.isNull(value)) {
- // This is a JSON literal null. When represented as a dictionary, this is
- // an empty map.
- return Collections.<String, Object>emptyMap();
- }
- if (!(value instanceof Map)) {
- throw new IncorrectTypeException(name, map, "a dictionary");
- }
- @SuppressWarnings("unchecked")
- Map<String, Object> result = (Map<String, Object>) value;
- return result;
- }
-
- // Builder operations.
-
- public static void addString(Map<String, Object> map, String name, String value) {
- addObject(map, name, CloudObject.forString(value));
- }
-
- public static void addBoolean(Map<String, Object> map, String name, boolean value) {
- addObject(map, name, CloudObject.forBoolean(value));
- }
-
- public static void addLong(Map<String, Object> map, String name, long value) {
- addObject(map, name, CloudObject.forInteger(value));
- }
-
- public static void addObject(Map<String, Object> map, String name, Map<String, Object> value) {
- map.put(name, value);
- }
-
- public static void addNull(Map<String, Object> map, String name) {
- map.put(name, Data.nullOf(Object.class));
- }
-
- public static void addLongs(Map<String, Object> map, String name, long... longs) {
- List<Map<String, Object>> elements = new ArrayList<>(longs.length);
- for (Long value : longs) {
- elements.add(CloudObject.forInteger(value));
- }
- map.put(name, elements);
- }
-
- public static void addList(
- Map<String, Object> map, String name, List<? extends Map<String, Object>> elements) {
- map.put(name, elements);
- }
-
- public static void addStringList(Map<String, Object> map, String name, List<String> elements) {
- ArrayList<CloudObject> objects = new ArrayList<>(elements.size());
- for (String element : elements) {
- objects.add(CloudObject.forString(element));
- }
- addList(map, name, objects);
- }
-
- public static <T extends Map<String, Object>> void addList(
- Map<String, Object> map, String name, T[] elements) {
- map.put(name, Arrays.asList(elements));
- }
-
- public static void addDictionary(
- Map<String, Object> map, String name, Map<String, Object> value) {
- map.put(name, value);
- }
-
- public static void addDouble(Map<String, Object> map, String name, Double value) {
- addObject(map, name, CloudObject.forFloat(value));
- }
-
- // Helper methods for a few of the accessor methods.
-
- private static <T> T getValue(Map<String, Object> map, String name, Class<T> clazz, String type) {
- @Nullable T result = getValue(map, name, clazz, type, null);
- if (result == null) {
- throw new ParameterNotFoundException(name, map);
- }
- return result;
- }
-
- @Nullable
- private static <T> T getValue(
- Map<String, Object> map, String name, Class<T> clazz, String type, @Nullable T defaultValue) {
- @Nullable Object value = map.get(name);
- if (value == null) {
- if (map.containsKey(name)) {
- throw new IncorrectTypeException(name, map, type);
- }
- return defaultValue;
- }
- T result = decodeValue(value, clazz);
- if (result == null) {
- // The value exists, but can't be decoded.
- throw new IncorrectTypeException(name, map, type);
- }
- return result;
- }
-
- @Nullable
- private static <T> T decodeValue(Object value, Class<T> clazz) {
- try {
- if (value.getClass() == clazz) {
- // decodeValue() is only called for final classes; if the class matches,
- // it's safe to just return the value, and if it doesn't match, decoding
- // is needed.
- return clazz.cast(value);
- }
- if (!(value instanceof Map)) {
- return null;
- }
- @SuppressWarnings("unchecked")
- Map<String, Object> map = (Map<String, Object>) value;
- @Nullable String typeName = (String) map.get(PropertyNames.OBJECT_TYPE_NAME);
- if (typeName == null) {
- return null;
- }
- @Nullable CloudKnownType knownType = CloudKnownType.forUri(typeName);
- if (knownType == null) {
- return null;
- }
- @Nullable Object scalar = map.get(PropertyNames.SCALAR_FIELD_NAME);
- if (scalar == null) {
- return null;
- }
- return knownType.parse(scalar, clazz);
- } catch (ClassCastException e) {
- // If any class cast fails during decoding, the value's not decodable.
- return null;
- }
- }
-
- private static final class ParameterNotFoundException extends RuntimeException {
- public ParameterNotFoundException(String name, Map<String, Object> map) {
- super("didn't find required parameter " + name + " in " + map);
- }
- }
-
- private static final class IncorrectTypeException extends RuntimeException {
- public IncorrectTypeException(String name, Map<String, Object> map, String type) {
- super("required parameter " + name + " in " + map + " not " + type);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/a5627b1a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Values.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Values.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Values.java
deleted file mode 100644
index d8aa046..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Values.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import java.util.Map;
-import javax.annotation.Nullable;
-
-/**
- * A collection of static methods for manipulating value representations
- * transfered via the Dataflow API.
- */
-public final class Values {
- private Values() {} // Non-instantiable
-
- public static Boolean asBoolean(Object value) throws ClassCastException {
- @Nullable Boolean knownResult = checkKnownValue(CloudKnownType.BOOLEAN, value, Boolean.class);
- if (knownResult != null) {
- return knownResult;
- }
- return Boolean.class.cast(value);
- }
-
- public static Double asDouble(Object value) throws ClassCastException {
- @Nullable Double knownResult = checkKnownValue(CloudKnownType.FLOAT, value, Double.class);
- if (knownResult != null) {
- return knownResult;
- }
- if (value instanceof Double) {
- return (Double) value;
- }
- return ((Float) value).doubleValue();
- }
-
- public static Long asLong(Object value) throws ClassCastException {
- @Nullable Long knownResult = checkKnownValue(CloudKnownType.INTEGER, value, Long.class);
- if (knownResult != null) {
- return knownResult;
- }
- if (value instanceof Long) {
- return (Long) value;
- }
- return ((Integer) value).longValue();
- }
-
- public static String asString(Object value) throws ClassCastException {
- @Nullable String knownResult = checkKnownValue(CloudKnownType.TEXT, value, String.class);
- if (knownResult != null) {
- return knownResult;
- }
- return String.class.cast(value);
- }
-
- @Nullable
- private static <T> T checkKnownValue(CloudKnownType type, Object value, Class<T> clazz) {
- if (!(value instanceof Map)) {
- return null;
- }
- Map<String, Object> map = (Map<String, Object>) value;
- @Nullable String typeName = (String) map.get(PropertyNames.OBJECT_TYPE_NAME);
- if (typeName == null) {
- return null;
- }
- @Nullable CloudKnownType knownType = CloudKnownType.forUri(typeName);
- if (knownType == null || knownType != type) {
- return null;
- }
- @Nullable Object scalar = map.get(PropertyNames.SCALAR_FIELD_NAME);
- if (scalar == null) {
- return null;
- }
- return knownType.parse(scalar, clazz);
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/a5627b1a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
index 23666ca..13e499d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
@@ -17,11 +17,8 @@
*/
package org.apache.beam.sdk.util;
-import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.MoreObjects;
import com.google.common.collect.ImmutableList;
import java.io.IOException;
@@ -610,17 +607,6 @@ public abstract class WindowedValue<T> {
return new FullWindowedValueCoder<>(valueCoder, windowCoder);
}
- @JsonCreator
- public static FullWindowedValueCoder<?> of(
- @JsonProperty(PropertyNames.COMPONENT_ENCODINGS)
- List<Coder<?>> components) {
- checkArgument(components.size() == 2,
- "Expecting 2 components, got " + components.size());
- @SuppressWarnings("unchecked")
- Coder<? extends BoundedWindow> window = (Coder<? extends BoundedWindow>) components.get(1);
- return of(components.get(0), window);
- }
-
FullWindowedValueCoder(Coder<T> valueCoder,
Coder<? extends BoundedWindow> windowCoder) {
super(valueCoder);
@@ -717,14 +703,6 @@ public abstract class WindowedValue<T> {
return new ValueOnlyWindowedValueCoder<>(valueCoder);
}
- @JsonCreator
- public static ValueOnlyWindowedValueCoder<?> of(
- @JsonProperty(PropertyNames.COMPONENT_ENCODINGS)
- List<Coder<?>> components) {
- checkArgument(components.size() == 1, "Expecting 1 component, got " + components.size());
- return of(components.get(0));
- }
-
ValueOnlyWindowedValueCoder(Coder<T> valueCoder) {
super(valueCoder);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/a5627b1a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TupleTag.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TupleTag.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TupleTag.java
index 37d41f7..c5e04b0 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TupleTag.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TupleTag.java
@@ -17,18 +17,11 @@
*/
package org.apache.beam.sdk.values;
-import static org.apache.beam.sdk.util.Structs.addBoolean;
-import static org.apache.beam.sdk.util.Structs.addString;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.HashMultiset;
import com.google.common.collect.Multiset;
import java.io.Serializable;
import java.util.Random;
import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.util.CloudObject;
-import org.apache.beam.sdk.util.PropertyNames;
/**
* A {@link TupleTag} is a typed tag to use as the key of a heterogeneously typed tuple, like {@link
@@ -153,26 +146,11 @@ public class TupleTag<V> implements Serializable {
return caller + "#" + nonce;
}
- @JsonCreator
- @SuppressWarnings("unused")
- private static TupleTag<?> fromJson(
- @JsonProperty(PropertyNames.VALUE) String id,
- @JsonProperty(PropertyNames.IS_GENERATED) boolean generated) {
- return new TupleTag<>(id, generated);
- }
-
private TupleTag(String id, boolean generated) {
this.id = id;
this.generated = generated;
}
- public CloudObject asCloudObject() {
- CloudObject result = CloudObject.forClass(getClass());
- addString(result, PropertyNames.VALUE, id);
- addBoolean(result, PropertyNames.IS_GENERATED, generated);
- return result;
- }
-
@Override
public boolean equals(Object that) {
if (that instanceof TupleTag) {
http://git-wip-us.apache.org/repos/asf/beam/blob/a5627b1a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/SerializerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/SerializerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/SerializerTest.java
deleted file mode 100644
index f6bacc4..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/SerializerTest.java
+++ /dev/null
@@ -1,162 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import static org.apache.beam.sdk.util.Structs.addBoolean;
-import static org.apache.beam.sdk.util.Structs.addDouble;
-import static org.apache.beam.sdk.util.Structs.addLong;
-import static org.apache.beam.sdk.util.Structs.addString;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.annotation.JsonTypeInfo;
-import org.hamcrest.Matchers;
-import org.junit.Assert;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests Serializer implementation.
- */
-@RunWith(JUnit4.class)
-public class SerializerTest {
- /**
- * A POJO to use for testing serialization.
- */
- @JsonTypeInfo(use = JsonTypeInfo.Id.CLASS, include = JsonTypeInfo.As.PROPERTY,
- property = PropertyNames.OBJECT_TYPE_NAME)
- public static class TestRecord {
- // TODO: When we apply property name typing to all non-final classes, the
- // annotation on this class should be removed.
- public String name;
- public boolean ok;
- public int value;
- public double dValue;
- }
-
- @Test
- public void testStatefulDeserialization() {
- CloudObject object = CloudObject.forClass(TestRecord.class);
-
- addString(object, "name", "foobar");
- addBoolean(object, "ok", true);
- addLong(object, "value", 42L);
- addDouble(object, "dValue", .25);
-
- TestRecord record = Serializer.deserialize(object, TestRecord.class);
- Assert.assertEquals("foobar", record.name);
- Assert.assertEquals(true, record.ok);
- Assert.assertEquals(42L, record.value);
- Assert.assertEquals(0.25, record.dValue, 0.0001);
- }
-
- private static class InjectedTestRecord {
- private final String n;
- private final int v;
-
- @SuppressWarnings("unused") // used for JSON serialization
- public InjectedTestRecord(
- @JsonProperty("name") String name,
- @JsonProperty("value") int value) {
- this.n = name;
- this.v = value;
- }
-
- public String getName() {
- return n;
- }
- public int getValue() {
- return v;
- }
- }
-
- @Test
- public void testDeserializationInjection() {
- CloudObject object = CloudObject.forClass(InjectedTestRecord.class);
- addString(object, "name", "foobar");
- addLong(object, "value", 42L);
-
- InjectedTestRecord record =
- Serializer.deserialize(object, InjectedTestRecord.class);
-
- Assert.assertEquals("foobar", record.getName());
- Assert.assertEquals(42L, record.getValue());
- }
-
- private static class FactoryInjectedTestRecord {
- @JsonCreator
- public static FactoryInjectedTestRecord of(
- @JsonProperty("name") String name,
- @JsonProperty("value") int value) {
- return new FactoryInjectedTestRecord(name, value);
- }
-
- private final String n;
- private final int v;
-
- private FactoryInjectedTestRecord(String name, int value) {
- this.n = name;
- this.v = value;
- }
-
- public String getName() {
- return n;
- }
- public int getValue() {
- return v;
- }
- }
-
- @Test
- public void testDeserializationFactoryInjection() {
- CloudObject object = CloudObject.forClass(FactoryInjectedTestRecord.class);
- addString(object, "name", "foobar");
- addLong(object, "value", 42L);
-
- FactoryInjectedTestRecord record =
- Serializer.deserialize(object, FactoryInjectedTestRecord.class);
- Assert.assertEquals("foobar", record.getName());
- Assert.assertEquals(42L, record.getValue());
- }
-
- private static class DerivedTestRecord extends TestRecord {
- public String derived;
- }
-
- @Test
- public void testSubclassDeserialization() {
- CloudObject object = CloudObject.forClass(DerivedTestRecord.class);
-
- addString(object, "name", "foobar");
- addBoolean(object, "ok", true);
- addLong(object, "value", 42L);
- addDouble(object, "dValue", .25);
- addString(object, "derived", "baz");
-
- TestRecord result = Serializer.deserialize(object, TestRecord.class);
- Assert.assertThat(result, Matchers.instanceOf(DerivedTestRecord.class));
-
- DerivedTestRecord record = (DerivedTestRecord) result;
- Assert.assertEquals("foobar", record.name);
- Assert.assertEquals(true, record.ok);
- Assert.assertEquals(42L, record.value);
- Assert.assertEquals(0.25, record.dValue, 0.0001);
- Assert.assertEquals("baz", record.derived);
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/a5627b1a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/StructsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/StructsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/StructsTest.java
deleted file mode 100644
index 91090d1..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/StructsTest.java
+++ /dev/null
@@ -1,206 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import static org.apache.beam.sdk.util.Structs.addBoolean;
-import static org.apache.beam.sdk.util.Structs.addDouble;
-import static org.apache.beam.sdk.util.Structs.addList;
-import static org.apache.beam.sdk.util.Structs.addLong;
-import static org.apache.beam.sdk.util.Structs.addLongs;
-import static org.apache.beam.sdk.util.Structs.addNull;
-import static org.apache.beam.sdk.util.Structs.addString;
-import static org.apache.beam.sdk.util.Structs.addStringList;
-import static org.apache.beam.sdk.util.Structs.getBoolean;
-import static org.apache.beam.sdk.util.Structs.getDictionary;
-import static org.apache.beam.sdk.util.Structs.getInt;
-import static org.apache.beam.sdk.util.Structs.getListOfMaps;
-import static org.apache.beam.sdk.util.Structs.getLong;
-import static org.apache.beam.sdk.util.Structs.getObject;
-import static org.apache.beam.sdk.util.Structs.getString;
-import static org.apache.beam.sdk.util.Structs.getStrings;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import org.hamcrest.Matchers;
-import org.junit.Assert;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests for Structs.
- */
-@RunWith(JUnit4.class)
-public class StructsTest {
- private List<Map<String, Object>> makeCloudObjects() {
- List<Map<String, Object>> objects = new ArrayList<>();
- {
- CloudObject o = CloudObject.forClassName("string");
- addString(o, "singletonStringKey", "stringValue");
- objects.add(o);
- }
- {
- CloudObject o = CloudObject.forClassName("long");
- addLong(o, "singletonLongKey", 42L);
- objects.add(o);
- }
- return objects;
- }
-
- private Map<String, Object> makeCloudDictionary() {
- Map<String, Object> o = new HashMap<>();
- addList(o, "emptyKey", Collections.<Map<String, Object>>emptyList());
- addNull(o, "noStringsKey");
- addString(o, "singletonStringKey", "stringValue");
- addStringList(o, "multipleStringsKey", Arrays.asList("hi", "there", "bob"));
- addLongs(o, "multipleLongsKey", 47L, 1L << 42, -5L);
- addLong(o, "singletonLongKey", 42L);
- addDouble(o, "singletonDoubleKey", 3.14);
- addBoolean(o, "singletonBooleanKey", true);
- addNull(o, "noObjectsKey");
- addList(o, "multipleObjectsKey", makeCloudObjects());
- return o;
- }
-
- @Test
- public void testGetStringParameter() throws Exception {
- Map<String, Object> o = makeCloudDictionary();
-
- Assert.assertEquals(
- "stringValue",
- getString(o, "singletonStringKey"));
- Assert.assertEquals(
- "stringValue",
- getString(o, "singletonStringKey", "defaultValue"));
- Assert.assertEquals(
- "defaultValue",
- getString(o, "missingKey", "defaultValue"));
-
- try {
- getString(o, "missingKey");
- Assert.fail("should have thrown an exception");
- } catch (Exception exn) {
- Assert.assertThat(exn.toString(),
- Matchers.containsString(
- "didn't find required parameter missingKey"));
- }
-
- try {
- getString(o, "noStringsKey");
- Assert.fail("should have thrown an exception");
- } catch (Exception exn) {
- Assert.assertThat(exn.toString(),
- Matchers.containsString("not a string"));
- }
-
- Assert.assertThat(getStrings(o, "noStringsKey", null), Matchers.<String>emptyIterable());
- Assert.assertThat(getObject(o, "noStringsKey").keySet(), Matchers.<String>emptyIterable());
- Assert.assertThat(getDictionary(o, "noStringsKey").keySet(), Matchers.<String>emptyIterable());
- Assert.assertThat(getDictionary(o, "noStringsKey", null).keySet(),
- Matchers.<String>emptyIterable());
-
- try {
- getString(o, "multipleStringsKey");
- Assert.fail("should have thrown an exception");
- } catch (Exception exn) {
- Assert.assertThat(exn.toString(),
- Matchers.containsString("not a string"));
- }
-
- try {
- getString(o, "emptyKey");
- Assert.fail("should have thrown an exception");
- } catch (Exception exn) {
- Assert.assertThat(exn.toString(),
- Matchers.containsString("not a string"));
- }
- }
-
- @Test
- public void testGetBooleanParameter() throws Exception {
- Map<String, Object> o = makeCloudDictionary();
-
- Assert.assertEquals(
- true,
- getBoolean(o, "singletonBooleanKey", false));
- Assert.assertEquals(
- false,
- getBoolean(o, "missingKey", false));
-
- try {
- getBoolean(o, "emptyKey", false);
- Assert.fail("should have thrown an exception");
- } catch (Exception exn) {
- Assert.assertThat(exn.toString(),
- Matchers.containsString("not a boolean"));
- }
- }
-
- @Test
- public void testGetLongParameter() throws Exception {
- Map<String, Object> o = makeCloudDictionary();
-
- Assert.assertEquals(
- (Long) 42L,
- getLong(o, "singletonLongKey", 666L));
- Assert.assertEquals(
- (Integer) 42,
- getInt(o, "singletonLongKey", 666));
- Assert.assertEquals(
- (Long) 666L,
- getLong(o, "missingKey", 666L));
-
- try {
- getLong(o, "emptyKey", 666L);
- Assert.fail("should have thrown an exception");
- } catch (Exception exn) {
- Assert.assertThat(exn.toString(),
- Matchers.containsString("not a long"));
- }
- try {
- getInt(o, "emptyKey", 666);
- Assert.fail("should have thrown an exception");
- } catch (Exception exn) {
- Assert.assertThat(exn.toString(),
- Matchers.containsString("not an int"));
- }
- }
-
- @Test
- public void testGetListOfMaps() throws Exception {
- Map<String, Object> o = makeCloudDictionary();
-
- Assert.assertEquals(
- makeCloudObjects(),
- getListOfMaps(o, "multipleObjectsKey", null));
-
- try {
- getListOfMaps(o, "singletonLongKey", null);
- Assert.fail("should have thrown an exception");
- } catch (Exception exn) {
- Assert.assertThat(exn.toString(),
- Matchers.containsString("not a list"));
- }
- }
-
- // TODO: Test builder operations.
-}
[2/3] beam git commit: [BEAM-1871] Move GCP specific serialization
CloudObject and supporting translation code to Dataflow runner module
Posted by lc...@apache.org.
[BEAM-1871] Move GCP specific serialization CloudObject and supporting translation code to Dataflow runner module
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a5627b1a
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a5627b1a
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a5627b1a
Branch: refs/heads/master
Commit: a5627b1a64696d7526bc5aeec5a0b51571fb5ef1
Parents: 320f9af
Author: Luke Cwik <lc...@google.com>
Authored: Wed May 3 10:03:45 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Wed May 3 12:46:17 2017 -0700
----------------------------------------------------------------------
runners/google-cloud-dataflow-java/pom.xml | 2 +-
.../dataflow/DataflowPipelineTranslator.java | 18 +-
.../beam/runners/dataflow/DataflowRunner.java | 2 +-
.../beam/runners/dataflow/ReadTranslator.java | 8 +-
.../dataflow/internal/CustomSources.java | 6 +-
.../util/AvroCoderCloudObjectTranslator.java | 2 -
.../beam/runners/dataflow/util/CloudObject.java | 1 -
.../runners/dataflow/util/CloudObjectKinds.java | 2 -
.../dataflow/util/CloudObjectTranslator.java | 2 -
.../dataflow/util/CloudObjectTranslators.java | 23 +-
.../runners/dataflow/util/CloudObjects.java | 1 -
.../CoderCloudObjectTranslatorRegistrar.java | 1 -
.../runners/dataflow/util/PropertyNames.java | 112 ++++++
.../SerializableCoderCloudObjectTranslator.java | 2 -
.../beam/runners/dataflow/util/Serializer.java | 262 -------------
.../beam/runners/dataflow/util/Structs.java | 372 +++++++++++++++++++
.../DataflowPipelineTranslatorTest.java | 10 +-
.../runners/dataflow/util/CloudObjectsTest.java | 1 -
.../beam/runners/dataflow/util/StructsTest.java | 206 ++++++++++
.../apache/beam/sdk/coders/CollectionCoder.java | 13 -
.../apache/beam/sdk/coders/IterableCoder.java | 13 -
.../org/apache/beam/sdk/coders/KvCoder.java | 13 -
.../beam/sdk/coders/LengthPrefixCoder.java | 13 -
.../org/apache/beam/sdk/coders/ListCoder.java | 13 -
.../org/apache/beam/sdk/coders/SetCoder.java | 16 -
.../apache/beam/sdk/util/CloudKnownType.java | 143 -------
.../org/apache/beam/sdk/util/CloudObject.java | 187 ----------
.../org/apache/beam/sdk/util/CoderUtils.java | 117 ------
.../org/apache/beam/sdk/util/PropertyNames.java | 112 ------
.../org/apache/beam/sdk/util/Serializer.java | 147 --------
.../java/org/apache/beam/sdk/util/Structs.java | 371 ------------------
.../java/org/apache/beam/sdk/util/Values.java | 88 -----
.../org/apache/beam/sdk/util/WindowedValue.java | 22 --
.../org/apache/beam/sdk/values/TupleTag.java | 22 --
.../apache/beam/sdk/util/SerializerTest.java | 162 --------
.../org/apache/beam/sdk/util/StructsTest.java | 206 ----------
36 files changed, 729 insertions(+), 1962 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/a5627b1a/runners/google-cloud-dataflow-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/pom.xml b/runners/google-cloud-dataflow-java/pom.xml
index bbad156..30ef84d 100644
--- a/runners/google-cloud-dataflow-java/pom.xml
+++ b/runners/google-cloud-dataflow-java/pom.xml
@@ -33,7 +33,7 @@
<packaging>jar</packaging>
<properties>
- <dataflow.container_version>beam-master-20170502</dataflow.container_version>
+ <dataflow.container_version>beam-master-20170503</dataflow.container_version>
<dataflow.fnapi_environment_major_version>1</dataflow.fnapi_environment_major_version>
<dataflow.legacy_environment_major_version>6</dataflow.legacy_environment_major_version>
</properties>
http://git-wip-us.apache.org/repos/asf/beam/blob/a5627b1a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
index 28a9c1c..05edd28 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
@@ -21,15 +21,15 @@ import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Strings.isNullOrEmpty;
+import static org.apache.beam.runners.dataflow.util.Structs.addBoolean;
+import static org.apache.beam.runners.dataflow.util.Structs.addDictionary;
+import static org.apache.beam.runners.dataflow.util.Structs.addList;
+import static org.apache.beam.runners.dataflow.util.Structs.addLong;
+import static org.apache.beam.runners.dataflow.util.Structs.addObject;
+import static org.apache.beam.runners.dataflow.util.Structs.addString;
+import static org.apache.beam.runners.dataflow.util.Structs.getString;
import static org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray;
import static org.apache.beam.sdk.util.StringUtils.byteArrayToJsonString;
-import static org.apache.beam.sdk.util.Structs.addBoolean;
-import static org.apache.beam.sdk.util.Structs.addDictionary;
-import static org.apache.beam.sdk.util.Structs.addList;
-import static org.apache.beam.sdk.util.Structs.addLong;
-import static org.apache.beam.sdk.util.Structs.addObject;
-import static org.apache.beam.sdk.util.Structs.addString;
-import static org.apache.beam.sdk.util.Structs.getString;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -62,9 +62,11 @@ import org.apache.beam.runners.dataflow.PrimitiveParDoSingleFactory.ParDoSingle;
import org.apache.beam.runners.dataflow.TransformTranslator.StepTranslationContext;
import org.apache.beam.runners.dataflow.TransformTranslator.TranslationContext;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
+import org.apache.beam.runners.dataflow.util.CloudObject;
import org.apache.beam.runners.dataflow.util.CloudObjects;
import org.apache.beam.runners.dataflow.util.DoFnInfo;
import org.apache.beam.runners.dataflow.util.OutputReference;
+import org.apache.beam.runners.dataflow.util.PropertyNames;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.Pipeline.PipelineVisitor;
import org.apache.beam.sdk.coders.Coder;
@@ -86,8 +88,6 @@ import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.util.AppliedCombineFn;
-import org.apache.beam.sdk.util.CloudObject;
-import org.apache.beam.sdk.util.PropertyNames;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.values.KV;
http://git-wip-us.apache.org/repos/asf/beam/blob/a5627b1a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 6aaa11b..7da1755 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -73,6 +73,7 @@ import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.runners.dataflow.util.DataflowTemplateJob;
import org.apache.beam.runners.dataflow.util.DataflowTransport;
import org.apache.beam.runners.dataflow.util.MonitoringUtil;
+import org.apache.beam.runners.dataflow.util.PropertyNames;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.Pipeline.PipelineVisitor;
import org.apache.beam.sdk.PipelineResult.State;
@@ -115,7 +116,6 @@ import org.apache.beam.sdk.util.InstanceBuilder;
import org.apache.beam.sdk.util.MimeTypes;
import org.apache.beam.sdk.util.NameUtils;
import org.apache.beam.sdk.util.PathValidator;
-import org.apache.beam.sdk.util.PropertyNames;
import org.apache.beam.sdk.util.ReleaseInfo;
import org.apache.beam.sdk.util.Reshuffle;
import org.apache.beam.sdk.util.ValueWithRecordId;
http://git-wip-us.apache.org/repos/asf/beam/blob/a5627b1a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/ReadTranslator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/ReadTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/ReadTranslator.java
index bc68511..c304c32 100755
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/ReadTranslator.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/ReadTranslator.java
@@ -17,20 +17,20 @@
*/
package org.apache.beam.runners.dataflow;
-import static org.apache.beam.sdk.util.Structs.addBoolean;
-import static org.apache.beam.sdk.util.Structs.addDictionary;
-import static org.apache.beam.sdk.util.Structs.addLong;
+import static org.apache.beam.runners.dataflow.util.Structs.addBoolean;
+import static org.apache.beam.runners.dataflow.util.Structs.addDictionary;
+import static org.apache.beam.runners.dataflow.util.Structs.addLong;
import com.google.api.services.dataflow.model.SourceMetadata;
import java.util.HashMap;
import java.util.Map;
import org.apache.beam.runners.dataflow.internal.CustomSources;
+import org.apache.beam.runners.dataflow.util.PropertyNames;
import org.apache.beam.sdk.io.FileBasedSource;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.Source;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.util.PropertyNames;
import org.apache.beam.sdk.values.PValue;
/**
http://git-wip-us.apache.org/repos/asf/beam/blob/a5627b1a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/CustomSources.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/CustomSources.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/CustomSources.java
index 778ccf3..0d93566 100755
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/CustomSources.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/CustomSources.java
@@ -19,9 +19,9 @@ package org.apache.beam.runners.dataflow.internal;
import static com.google.api.client.util.Base64.encodeBase64String;
import static com.google.common.base.Preconditions.checkArgument;
+import static org.apache.beam.runners.dataflow.util.Structs.addString;
+import static org.apache.beam.runners.dataflow.util.Structs.addStringList;
import static org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray;
-import static org.apache.beam.sdk.util.Structs.addString;
-import static org.apache.beam.sdk.util.Structs.addStringList;
import com.google.api.services.dataflow.model.SourceMetadata;
import com.google.common.annotations.VisibleForTesting;
@@ -29,11 +29,11 @@ import com.google.protobuf.ByteString;
import java.util.ArrayList;
import java.util.List;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
+import org.apache.beam.runners.dataflow.util.CloudObject;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.Source;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.util.CloudObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/beam/blob/a5627b1a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/AvroCoderCloudObjectTranslator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/AvroCoderCloudObjectTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/AvroCoderCloudObjectTranslator.java
index 444849d..c4d807e 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/AvroCoderCloudObjectTranslator.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/AvroCoderCloudObjectTranslator.java
@@ -20,8 +20,6 @@ package org.apache.beam.runners.dataflow.util;
import org.apache.avro.Schema;
import org.apache.beam.sdk.coders.AvroCoder;
-import org.apache.beam.sdk.util.CloudObject;
-import org.apache.beam.sdk.util.Structs;
/** A {@link CloudObjectTranslator} for {@link AvroCoder}. */
class AvroCoderCloudObjectTranslator implements CloudObjectTranslator<AvroCoder> {
http://git-wip-us.apache.org/repos/asf/beam/blob/a5627b1a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObject.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObject.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObject.java
index e4dd9be..b3680e9 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObject.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObject.java
@@ -24,7 +24,6 @@ import com.google.api.client.json.GenericJson;
import com.google.api.client.util.Key;
import java.util.Map;
import javax.annotation.Nullable;
-import org.apache.beam.sdk.util.PropertyNames;
/**
* A representation of an arbitrary Java object to be instantiated by Dataflow
http://git-wip-us.apache.org/repos/asf/beam/blob/a5627b1a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectKinds.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectKinds.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectKinds.java
index 1499f17..403ade2 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectKinds.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectKinds.java
@@ -18,8 +18,6 @@
package org.apache.beam.runners.dataflow.util;
-import org.apache.beam.sdk.util.CloudObject;
-
/**
* Known kinds of {@link CloudObject}.
*/
http://git-wip-us.apache.org/repos/asf/beam/blob/a5627b1a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectTranslator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectTranslator.java
index 534370f..775495b 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectTranslator.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectTranslator.java
@@ -18,8 +18,6 @@
package org.apache.beam.runners.dataflow.util;
-import org.apache.beam.sdk.util.CloudObject;
-
/**
* A translator that takes an object and creates a {@link CloudObject} which can be converted back
* to the original object.
http://git-wip-us.apache.org/repos/asf/beam/blob/a5627b1a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectTranslators.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectTranslators.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectTranslators.java
index f3e3312..012a669 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectTranslators.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectTranslators.java
@@ -38,14 +38,12 @@ import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.transforms.join.CoGbkResult.CoGbkResultCoder;
import org.apache.beam.sdk.transforms.join.CoGbkResultSchema;
import org.apache.beam.sdk.transforms.join.UnionCoder;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow.IntervalWindowCoder;
-import org.apache.beam.sdk.util.CloudObject;
import org.apache.beam.sdk.util.InstanceBuilder;
-import org.apache.beam.sdk.util.PropertyNames;
import org.apache.beam.sdk.util.SerializableUtils;
import org.apache.beam.sdk.util.StringUtils;
-import org.apache.beam.sdk.util.Structs;
import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
import org.apache.beam.sdk.values.TupleTag;
@@ -94,7 +92,9 @@ class CloudObjectTranslators {
@Override
public KvCoder fromCloudObject(CloudObject object) {
- return KvCoder.of(getComponents(object));
+ List<Coder<?>> components = getComponents(object);
+ checkArgument(components.size() == 2, "Expecting 2 components, got %s", components.size());
+ return KvCoder.of(components.get(0), components.get(1));
}
@Override
@@ -125,7 +125,9 @@ class CloudObjectTranslators {
@Override
public IterableCoder fromCloudObject(CloudObject object) {
- return IterableCoder.of(getComponents(object));
+ List<Coder<?>> components = getComponents(object);
+ checkArgument(components.size() == 1, "Expecting 1 component, got %s", components.size());
+ return IterableCoder.of(components.get(0));
}
@Override
@@ -155,7 +157,9 @@ class CloudObjectTranslators {
@Override
public LengthPrefixCoder fromCloudObject(CloudObject object) {
- return LengthPrefixCoder.of(getComponents(object));
+ List<Coder<?>> components = getComponents(object);
+ checkArgument(components.size() == 1, "Expecting 1 component, got %s", components.size());
+ return LengthPrefixCoder.of(components.get(0));
}
@Override
@@ -246,7 +250,12 @@ class CloudObjectTranslators {
@Override
public FullWindowedValueCoder fromCloudObject(CloudObject object) {
- return FullWindowedValueCoder.of(getComponents(object));
+ List<Coder<?>> components = getComponents(object);
+ checkArgument(components.size() == 2,
+ "Expecting 2 components, got " + components.size());
+ @SuppressWarnings("unchecked")
+ Coder<? extends BoundedWindow> window = (Coder<? extends BoundedWindow>) components.get(1);
+ return FullWindowedValueCoder.of(components.get(0), window);
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/a5627b1a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjects.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjects.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjects.java
index 9383c48..42c7012 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjects.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjects.java
@@ -26,7 +26,6 @@ import java.util.Map;
import java.util.ServiceLoader;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CustomCoder;
-import org.apache.beam.sdk.util.CloudObject;
/** Utilities for converting an object to a {@link CloudObject}. */
public class CloudObjects {
http://git-wip-us.apache.org/repos/asf/beam/blob/a5627b1a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CoderCloudObjectTranslatorRegistrar.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CoderCloudObjectTranslatorRegistrar.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CoderCloudObjectTranslatorRegistrar.java
index 446eb3b..928e629 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CoderCloudObjectTranslatorRegistrar.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CoderCloudObjectTranslatorRegistrar.java
@@ -22,7 +22,6 @@ import com.google.auto.service.AutoService;
import java.util.Map;
import java.util.ServiceLoader;
import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.util.CloudObject;
/**
* {@link Coder} authors have the ability to automatically have their {@link Coder} registered with
http://git-wip-us.apache.org/repos/asf/beam/blob/a5627b1a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PropertyNames.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PropertyNames.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PropertyNames.java
new file mode 100644
index 0000000..c8c9903
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PropertyNames.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.util;
+
+/**
+ * Constant property names used by the SDK in CloudWorkflow specifications.
+ */
+public class PropertyNames {
+ public static final String ALLOWED_ENCODINGS = "allowed_encodings";
+ public static final String APPEND_TRAILING_NEWLINES = "append_trailing_newlines";
+ public static final String BIGQUERY_CREATE_DISPOSITION = "create_disposition";
+ public static final String BIGQUERY_DATASET = "dataset";
+ public static final String BIGQUERY_PROJECT = "project";
+ public static final String BIGQUERY_SCHEMA = "schema";
+ public static final String BIGQUERY_TABLE = "table";
+ public static final String BIGQUERY_QUERY = "bigquery_query";
+ public static final String BIGQUERY_FLATTEN_RESULTS = "bigquery_flatten_results";
+ public static final String BIGQUERY_USE_LEGACY_SQL = "bigquery_use_legacy_sql";
+ public static final String BIGQUERY_WRITE_DISPOSITION = "write_disposition";
+ public static final String BIGQUERY_EXPORT_FORMAT = "bigquery_export_format";
+ public static final String BIGQUERY_EXPORT_SCHEMA = "bigquery_export_schema";
+ public static final String CO_GBK_RESULT_SCHEMA = "co_gbk_result_schema";
+ public static final String COMBINE_FN = "combine_fn";
+ public static final String COMPONENT_ENCODINGS = "component_encodings";
+ public static final String COMPRESSION_TYPE = "compression_type";
+ public static final String CUSTOM_SOURCE_FORMAT = "custom_source";
+ public static final String CONCAT_SOURCE_SOURCES = "sources";
+ public static final String CONCAT_SOURCE_BASE_SPECS = "base_specs";
+ public static final String SOURCE_STEP_INPUT = "custom_source_step_input";
+ public static final String SOURCE_SPEC = "spec";
+ public static final String SOURCE_METADATA = "metadata";
+ public static final String SOURCE_DOES_NOT_NEED_SPLITTING = "does_not_need_splitting";
+ public static final String SOURCE_PRODUCES_SORTED_KEYS = "produces_sorted_keys";
+ public static final String SOURCE_IS_INFINITE = "is_infinite";
+ public static final String SOURCE_ESTIMATED_SIZE_BYTES = "estimated_size_bytes";
+ public static final String ELEMENT = "element";
+ public static final String ELEMENTS = "elements";
+ public static final String ENCODING = "encoding";
+ public static final String ENCODING_ID = "encoding_id";
+ public static final String END_INDEX = "end_index";
+ public static final String END_OFFSET = "end_offset";
+ public static final String END_SHUFFLE_POSITION = "end_shuffle_position";
+ public static final String ENVIRONMENT_VERSION_JOB_TYPE_KEY = "job_type";
+ public static final String ENVIRONMENT_VERSION_MAJOR_KEY = "major";
+ public static final String FILENAME = "filename";
+ public static final String FILENAME_PREFIX = "filename_prefix";
+ public static final String FILENAME_SUFFIX = "filename_suffix";
+ public static final String FILEPATTERN = "filepattern";
+ public static final String FOOTER = "footer";
+ public static final String FORMAT = "format";
+ public static final String HEADER = "header";
+ public static final String INPUTS = "inputs";
+ public static final String INPUT_CODER = "input_coder";
+ public static final String IS_GENERATED = "is_generated";
+ public static final String IS_MERGING_WINDOW_FN = "is_merging_window_fn";
+ public static final String IS_PAIR_LIKE = "is_pair_like";
+ public static final String IS_STREAM_LIKE = "is_stream_like";
+ public static final String IS_WRAPPER = "is_wrapper";
+ public static final String DISALLOW_COMBINER_LIFTING = "disallow_combiner_lifting";
+ public static final String NON_PARALLEL_INPUTS = "non_parallel_inputs";
+ public static final String NUM_SHARD_CODERS = "num_shard_coders";
+ public static final String NUM_METADATA_SHARD_CODERS = "num_metadata_shard_coders";
+ public static final String NUM_SHARDS = "num_shards";
+ public static final String OBJECT_TYPE_NAME = "@type";
+ public static final String OUTPUT = "output";
+ public static final String OUTPUT_INFO = "output_info";
+ public static final String OUTPUT_NAME = "output_name";
+ public static final String PARALLEL_INPUT = "parallel_input";
+ public static final String PHASE = "phase";
+ public static final String PUBSUB_ID_ATTRIBUTE = "pubsub_id_label";
+ public static final String PUBSUB_SERIALIZED_ATTRIBUTES_FN = "pubsub_serialized_attributes_fn";
+ public static final String PUBSUB_SUBSCRIPTION = "pubsub_subscription";
+ public static final String PUBSUB_SUBSCRIPTION_OVERRIDE = "pubsub_subscription_runtime_override";
+ public static final String PUBSUB_TIMESTAMP_ATTRIBUTE = "pubsub_timestamp_label";
+ public static final String PUBSUB_TOPIC = "pubsub_topic";
+ public static final String PUBSUB_TOPIC_OVERRIDE = "pubsub_topic_runtime_override";
+ public static final String SCALAR_FIELD_NAME = "value";
+ public static final String SERIALIZED_FN = "serialized_fn";
+ public static final String SHARD_NAME_TEMPLATE = "shard_template";
+ public static final String SHUFFLE_KIND = "shuffle_kind";
+ public static final String SHUFFLE_READER_CONFIG = "shuffle_reader_config";
+ public static final String SHUFFLE_WRITER_CONFIG = "shuffle_writer_config";
+ public static final String SORT_VALUES = "sort_values";
+ public static final String START_INDEX = "start_index";
+ public static final String START_OFFSET = "start_offset";
+ public static final String START_SHUFFLE_POSITION = "start_shuffle_position";
+ public static final String STRIP_TRAILING_NEWLINES = "strip_trailing_newlines";
+ public static final String TUPLE_TAGS = "tuple_tags";
+ public static final String USE_INDEXED_FORMAT = "use_indexed_format";
+ public static final String USER_FN = "user_fn";
+ public static final String USER_NAME = "user_name";
+ public static final String USES_KEYED_STATE = "uses_keyed_state";
+ public static final String VALIDATE_SINK = "validate_sink";
+ public static final String VALIDATE_SOURCE = "validate_source";
+ public static final String VALUE = "value";
+ public static final String DISPLAY_DATA = "display_data";
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/a5627b1a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/SerializableCoderCloudObjectTranslator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/SerializableCoderCloudObjectTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/SerializableCoderCloudObjectTranslator.java
index 67c021c..dcc311e 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/SerializableCoderCloudObjectTranslator.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/SerializableCoderCloudObjectTranslator.java
@@ -22,8 +22,6 @@ import static com.google.common.base.Preconditions.checkArgument;
import java.io.Serializable;
import org.apache.beam.sdk.coders.SerializableCoder;
-import org.apache.beam.sdk.util.CloudObject;
-import org.apache.beam.sdk.util.Structs;
/** A {@link CloudObjectTranslator} for {@link SerializableCoder}. */
class SerializableCoderCloudObjectTranslator implements CloudObjectTranslator<SerializableCoder> {
http://git-wip-us.apache.org/repos/asf/beam/blob/a5627b1a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/Serializer.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/Serializer.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/Serializer.java
deleted file mode 100644
index e2bcafe..0000000
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/Serializer.java
+++ /dev/null
@@ -1,262 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.dataflow.util;
-
-import com.fasterxml.jackson.annotation.JsonTypeInfo;
-import com.fasterxml.jackson.annotation.JsonTypeInfo.As;
-import com.fasterxml.jackson.annotation.JsonTypeInfo.Id;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.DatabindContext;
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.JavaType;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.annotation.JsonTypeIdResolver;
-import com.fasterxml.jackson.databind.jsontype.impl.TypeIdResolverBase;
-import com.fasterxml.jackson.databind.module.SimpleModule;
-import com.fasterxml.jackson.databind.type.TypeFactory;
-import com.google.common.collect.ImmutableMap;
-import java.lang.reflect.TypeVariable;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import javax.annotation.Nullable;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.IterableCoder;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.coders.LengthPrefixCoder;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
-import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
-import org.apache.beam.sdk.util.PropertyNames;
-import org.apache.beam.sdk.util.WindowedValue;
-
-/**
- * Utility for converting objects between Java and Cloud representations.
- *
- * @deprecated Will no longer be used once all coders are converted via {@link CloudObjects}.
- */
-@Deprecated
-public final class Serializer {
- /** A mapping from well known coder types to their implementing classes. */
- private static final Map<String, Class<?>> WELL_KNOWN_CODER_TYPES =
- ImmutableMap.<String, Class<?>>builder()
- .put("kind:pair", KvCoder.class)
- .put("kind:stream", IterableCoder.class)
- .put("kind:global_window", GlobalWindow.Coder.class)
- .put("kind:interval_window", IntervalWindow.IntervalWindowCoder.class)
- .put("kind:length_prefix", LengthPrefixCoder.class)
- .put("kind:windowed_value", WindowedValue.FullWindowedValueCoder.class)
- .build();
-
- // Delay initialization of statics until the first call to Serializer.
- private static class SingletonHelper {
- static final ObjectMapper OBJECT_MAPPER = createObjectMapper();
- static final ObjectMapper TREE_MAPPER = createTreeMapper();
-
- /**
- * Creates the object mapper that will be used for serializing Google API
- * client maps into Jackson trees.
- */
- private static ObjectMapper createTreeMapper() {
- return new ObjectMapper();
- }
-
- /**
- * Creates the object mapper that will be used for deserializing Jackson
- * trees into objects.
- */
- private static ObjectMapper createObjectMapper() {
- ObjectMapper m = new ObjectMapper();
- // Ignore properties that are not used by the object.
- m.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
-
- // For parameters of type Object, use the @type property to determine the
- // class to instantiate.
- //
- // TODO: It would be ideal to do this for all non-final classes. The
- // problem with using DefaultTyping.NON_FINAL is that it insists on having
- // type information in the JSON for classes with useful default
- // implementations, such as List. Ideally, we'd combine these defaults
- // with available type information if that information's present.
- m.enableDefaultTypingAsProperty(
- ObjectMapper.DefaultTyping.JAVA_LANG_OBJECT,
- PropertyNames.OBJECT_TYPE_NAME);
-
- m.registerModule(new Jackson2Module());
-
- return m;
- }
- }
-
- /**
- * Deserializes an object from a Dataflow structured encoding (represented in
- * Java as a map).
- *
- * <p>The standard Dataflow SDK object serialization protocol is based on JSON.
- * Data is typically encoded as a JSON object whose fields represent the
- * object's data.
- *
- * <p>The actual deserialization is performed by Jackson, which can deserialize
- * public fields, use JavaBean setters, or use injection annotations to
- * indicate how to construct the object. The {@link ObjectMapper} used is
- * configured to use the "@type" field as the name of the class to instantiate
- * (supporting polymorphic types), and may be further configured by
- * annotations or via {@link ObjectMapper#registerModule}.
- *
- * @see <a href="http://wiki.fasterxml.com/JacksonFAQ#Data_Binding.2C_general">
- * Jackson Data-Binding</a>
- * @see <a href="https://github.com/FasterXML/jackson-annotations/wiki/Jackson-Annotations">
- * Jackson-Annotations</a>
- * @param serialized the object in untyped decoded form (i.e. a nested {@link Map})
- * @param clazz the expected object class
- */
- public static <T> T deserialize(Map<String, Object> serialized, Class<T> clazz) {
- try {
- return SingletonHelper.OBJECT_MAPPER.treeToValue(
- SingletonHelper.TREE_MAPPER.valueToTree(
- deserializeCloudKnownTypes(serialized)),
- clazz);
- } catch (JsonProcessingException e) {
- throw new RuntimeException(
- "Unable to deserialize class " + clazz, e);
- }
- }
-
- /**
- * Recursively walks the supplied map, looking for well-known cloud type information (keyed as
- * {@link PropertyNames#OBJECT_TYPE_NAME}, matching a URI value from the {@link CloudKnownType}
- * enum. Upon finding this type information, it converts it into the correspondingly typed Java
- * value.
- */
- @SuppressWarnings("unchecked")
- private static Object deserializeCloudKnownTypes(Object src) {
- if (src instanceof Map) {
- Map<String, Object> srcMap = (Map<String, Object>) src;
- @Nullable Object value = srcMap.get(PropertyNames.SCALAR_FIELD_NAME);
- @Nullable CloudKnownType type =
- CloudKnownType.forUri((String) srcMap.get(PropertyNames.OBJECT_TYPE_NAME));
- if (type != null && value != null) {
- // It's a value of a well-known cloud type; let the known type handler
- // handle the translation.
- Object result = type.parse(value, type.defaultClass());
- return result;
- }
- // Otherwise, it's just an ordinary map.
- Map<String, Object> dest = new HashMap<>(srcMap.size());
- for (Map.Entry<String, Object> entry : srcMap.entrySet()) {
- dest.put(entry.getKey(), deserializeCloudKnownTypes(entry.getValue()));
- }
- return dest;
- }
- if (src instanceof List) {
- List<Object> srcList = (List<Object>) src;
- List<Object> dest = new ArrayList<>(srcList.size());
- for (Object obj : srcList) {
- dest.add(deserializeCloudKnownTypes(obj));
- }
- return dest;
- }
- // Neither a Map nor a List; no translation needed.
- return src;
- }
-
- /**
- * A {@link com.fasterxml.jackson.databind.Module} that adds the type
- * resolver needed for Coder definitions.
- */
- static final class Jackson2Module extends SimpleModule {
- /**
- * The Coder custom type resolver.
- *
- * <p>This resolver resolves coders. If the Coder ID is a particular
- * well-known identifier, it's replaced with the corresponding class.
- * All other Coder instances are resolved by class name, using the package
- * org.apache.beam.sdk.coders if there are no "."s in the ID.
- */
- private static final class Resolver extends TypeIdResolverBase {
- @SuppressWarnings("unused") // Used via @JsonTypeIdResolver annotation on Mixin
- public Resolver() {
- super(TypeFactory.defaultInstance().constructType(Coder.class),
- TypeFactory.defaultInstance());
- }
-
- @Override
- public JavaType typeFromId(DatabindContext context, String id) {
- Class<?> clazz = getClassForId(id);
- @SuppressWarnings("rawtypes")
- TypeVariable[] tvs = clazz.getTypeParameters();
- JavaType[] types = new JavaType[tvs.length];
- for (int lupe = 0; lupe < tvs.length; lupe++) {
- types[lupe] = TypeFactory.unknownType();
- }
- return _typeFactory.constructSimpleType(clazz, types);
- }
-
- private Class<?> getClassForId(String id) {
- try {
- if (id.contains(".")) {
- return Class.forName(id);
- }
-
- if (WELL_KNOWN_CODER_TYPES.containsKey(id)) {
- return WELL_KNOWN_CODER_TYPES.get(id);
- }
-
- // Otherwise, see if the ID is the name of a class in
- // org.apache.beam.sdk.coders. We do this via creating
- // the class object so that class loaders have a chance to get
- // involved -- and since we need the class object anyway.
- return Class.forName(Coder.class.getPackage().getName() + "." + id);
- } catch (ClassNotFoundException e) {
- throw new RuntimeException("Unable to convert coder ID " + id + " to class", e);
- }
- }
-
- @Override
- public String idFromValueAndType(Object o, Class<?> clazz) {
- return clazz.getName();
- }
-
- @Override
- public String idFromValue(Object o) {
- return o.getClass().getName();
- }
-
- @Override
- public JsonTypeInfo.Id getMechanism() {
- return JsonTypeInfo.Id.CUSTOM;
- }
- }
-
- /**
- * The mixin class defining how Coders are handled by the deserialization
- * {@link ObjectMapper}.
- *
- * <p>This is done via a mixin so that this resolver is <i>only</i> used
- * during deserialization requested by the Apache Beam SDK.
- */
- @JsonTypeIdResolver(Resolver.class)
- @JsonTypeInfo(use = Id.CUSTOM, include = As.PROPERTY, property = PropertyNames.OBJECT_TYPE_NAME)
- private static final class Mixin {}
-
- public Jackson2Module() {
- super("BeamCoders");
- setMixInAnnotation(Coder.class, Mixin.class);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/a5627b1a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/Structs.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/Structs.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/Structs.java
new file mode 100644
index 0000000..1929f3b
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/Structs.java
@@ -0,0 +1,372 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.util;
+
+import com.google.api.client.util.Data;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.util.StringUtils;
+
+/**
+ * A collection of static methods for manipulating datastructure representations transferred via the
+ * Dataflow API.
+ */
+public final class Structs {
+ private Structs() {} // Non-instantiable
+
+ public static String getString(Map<String, Object> map, String name) {
+ return getValue(map, name, String.class, "a string");
+ }
+
+ public static String getString(
+ Map<String, Object> map, String name, @Nullable String defaultValue) {
+ return getValue(map, name, String.class, "a string", defaultValue);
+ }
+
+ public static byte[] getBytes(Map<String, Object> map, String name) {
+ @Nullable byte[] result = getBytes(map, name, null);
+ if (result == null) {
+ throw new ParameterNotFoundException(name, map);
+ }
+ return result;
+ }
+
+ @Nullable
+ public static byte[] getBytes(
+ Map<String, Object> map, String name, @Nullable byte[] defaultValue) {
+ @Nullable String jsonString = getString(map, name, null);
+ if (jsonString == null) {
+ return defaultValue;
+ }
+ // TODO: Need to agree on a format for encoding bytes in
+ // a string that can be sent to the backend, over the cloud
+ // map task work API. base64 encoding seems pretty common. Switch to it?
+ return StringUtils.jsonStringToByteArray(jsonString);
+ }
+
+ public static Boolean getBoolean(Map<String, Object> map, String name) {
+ return getValue(map, name, Boolean.class, "a boolean");
+ }
+
+ @Nullable
+ public static Boolean getBoolean(
+ Map<String, Object> map, String name, @Nullable Boolean defaultValue) {
+ return getValue(map, name, Boolean.class, "a boolean", defaultValue);
+ }
+
+ public static Long getLong(Map<String, Object> map, String name) {
+ return getValue(map, name, Long.class, "a long");
+ }
+
+ @Nullable
+ public static Long getLong(Map<String, Object> map, String name, @Nullable Long defaultValue) {
+ return getValue(map, name, Long.class, "a long", defaultValue);
+ }
+
+ public static Integer getInt(Map<String, Object> map, String name) {
+ return getValue(map, name, Integer.class, "an int");
+ }
+
+ @Nullable
+ public static Integer getInt(
+ Map<String, Object> map, String name, @Nullable Integer defaultValue) {
+ return getValue(map, name, Integer.class, "an int", defaultValue);
+ }
+
+ @Nullable
+ public static List<String> getStrings(
+ Map<String, Object> map, String name, @Nullable List<String> defaultValue) {
+ @Nullable Object value = map.get(name);
+ if (value == null) {
+ if (map.containsKey(name)) {
+ throw new IncorrectTypeException(name, map, "a string or a list");
+ }
+ return defaultValue;
+ }
+ if (Data.isNull(value)) {
+ // This is a JSON literal null. When represented as a list of strings,
+ // this is an empty list.
+ return Collections.<String>emptyList();
+ }
+ @Nullable String singletonString = decodeValue(value, String.class);
+ if (singletonString != null) {
+ return Collections.singletonList(singletonString);
+ }
+ if (!(value instanceof List)) {
+ throw new IncorrectTypeException(name, map, "a string or a list");
+ }
+ @SuppressWarnings("unchecked")
+ List<Object> elements = (List<Object>) value;
+ List<String> result = new ArrayList<>(elements.size());
+ for (Object o : elements) {
+ @Nullable String s = decodeValue(o, String.class);
+ if (s == null) {
+ throw new IncorrectTypeException(name, map, "a list of strings");
+ }
+ result.add(s);
+ }
+ return result;
+ }
+
+ public static Map<String, Object> getObject(Map<String, Object> map, String name) {
+ @Nullable Map<String, Object> result = getObject(map, name, null);
+ if (result == null) {
+ throw new ParameterNotFoundException(name, map);
+ }
+ return result;
+ }
+
+ @Nullable
+ public static Map<String, Object> getObject(
+ Map<String, Object> map, String name, @Nullable Map<String, Object> defaultValue) {
+ @Nullable Object value = map.get(name);
+ if (value == null) {
+ if (map.containsKey(name)) {
+ throw new IncorrectTypeException(name, map, "an object");
+ }
+ return defaultValue;
+ }
+ return checkObject(value, map, name);
+ }
+
+ private static Map<String, Object> checkObject(
+ Object value, Map<String, Object> map, String name) {
+ if (Data.isNull(value)) {
+ // This is a JSON literal null. When represented as an object, this is an
+ // empty map.
+ return Collections.<String, Object>emptyMap();
+ }
+ if (!(value instanceof Map)) {
+ throw new IncorrectTypeException(name, map, "an object (not a map)");
+ }
+ @SuppressWarnings("unchecked")
+ Map<String, Object> mapValue = (Map<String, Object>) value;
+ if (!mapValue.containsKey(PropertyNames.OBJECT_TYPE_NAME)) {
+ throw new IncorrectTypeException(
+ name, map, "an object (no \"" + PropertyNames.OBJECT_TYPE_NAME + "\" field)");
+ }
+ return mapValue;
+ }
+
+ @Nullable
+ public static List<Map<String, Object>> getListOfMaps(
+ Map<String, Object> map, String name, @Nullable List<Map<String, Object>> defaultValue) {
+ @Nullable Object value = map.get(name);
+ if (value == null) {
+ if (map.containsKey(name)) {
+ throw new IncorrectTypeException(name, map, "a list");
+ }
+ return defaultValue;
+ }
+ if (Data.isNull(value)) {
+ // This is a JSON literal null. When represented as a list,
+ // this is an empty list.
+ return Collections.<Map<String, Object>>emptyList();
+ }
+
+ if (!(value instanceof List)) {
+ throw new IncorrectTypeException(name, map, "a list");
+ }
+
+ List<?> elements = (List<?>) value;
+ for (Object elem : elements) {
+ if (!(elem instanceof Map)) {
+ throw new IncorrectTypeException(name, map, "a list of Map objects");
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ List<Map<String, Object>> result = (List<Map<String, Object>>) elements;
+ return result;
+ }
+
+ public static Map<String, Object> getDictionary(Map<String, Object> map, String name) {
+ @Nullable Object value = map.get(name);
+ if (value == null) {
+ throw new ParameterNotFoundException(name, map);
+ }
+ if (Data.isNull(value)) {
+ // This is a JSON literal null. When represented as a dictionary, this is
+ // an empty map.
+ return Collections.<String, Object>emptyMap();
+ }
+ if (!(value instanceof Map)) {
+ throw new IncorrectTypeException(name, map, "a dictionary");
+ }
+ @SuppressWarnings("unchecked")
+ Map<String, Object> result = (Map<String, Object>) value;
+ return result;
+ }
+
+ @Nullable
+ public static Map<String, Object> getDictionary(
+ Map<String, Object> map, String name, @Nullable Map<String, Object> defaultValue) {
+ @Nullable Object value = map.get(name);
+ if (value == null) {
+ if (map.containsKey(name)) {
+ throw new IncorrectTypeException(name, map, "a dictionary");
+ }
+ return defaultValue;
+ }
+ if (Data.isNull(value)) {
+ // This is a JSON literal null. When represented as a dictionary, this is
+ // an empty map.
+ return Collections.<String, Object>emptyMap();
+ }
+ if (!(value instanceof Map)) {
+ throw new IncorrectTypeException(name, map, "a dictionary");
+ }
+ @SuppressWarnings("unchecked")
+ Map<String, Object> result = (Map<String, Object>) value;
+ return result;
+ }
+
+ // Builder operations.
+
+ public static void addString(Map<String, Object> map, String name, String value) {
+ addObject(map, name, CloudObject.forString(value));
+ }
+
+ public static void addBoolean(Map<String, Object> map, String name, boolean value) {
+ addObject(map, name, CloudObject.forBoolean(value));
+ }
+
+ public static void addLong(Map<String, Object> map, String name, long value) {
+ addObject(map, name, CloudObject.forInteger(value));
+ }
+
+ public static void addObject(Map<String, Object> map, String name, Map<String, Object> value) {
+ map.put(name, value);
+ }
+
+ public static void addNull(Map<String, Object> map, String name) {
+ map.put(name, Data.nullOf(Object.class));
+ }
+
+ public static void addLongs(Map<String, Object> map, String name, long... longs) {
+ List<Map<String, Object>> elements = new ArrayList<>(longs.length);
+ for (Long value : longs) {
+ elements.add(CloudObject.forInteger(value));
+ }
+ map.put(name, elements);
+ }
+
+ public static void addList(
+ Map<String, Object> map, String name, List<? extends Map<String, Object>> elements) {
+ map.put(name, elements);
+ }
+
+ public static void addStringList(Map<String, Object> map, String name, List<String> elements) {
+ ArrayList<CloudObject> objects = new ArrayList<>(elements.size());
+ for (String element : elements) {
+ objects.add(CloudObject.forString(element));
+ }
+ addList(map, name, objects);
+ }
+
+ public static <T extends Map<String, Object>> void addList(
+ Map<String, Object> map, String name, T[] elements) {
+ map.put(name, Arrays.asList(elements));
+ }
+
+ public static void addDictionary(
+ Map<String, Object> map, String name, Map<String, Object> value) {
+ map.put(name, value);
+ }
+
+ public static void addDouble(Map<String, Object> map, String name, Double value) {
+ addObject(map, name, CloudObject.forFloat(value));
+ }
+
+ // Helper methods for a few of the accessor methods.
+
+ private static <T> T getValue(Map<String, Object> map, String name, Class<T> clazz, String type) {
+ @Nullable T result = getValue(map, name, clazz, type, null);
+ if (result == null) {
+ throw new ParameterNotFoundException(name, map);
+ }
+ return result;
+ }
+
+ @Nullable
+ private static <T> T getValue(
+ Map<String, Object> map, String name, Class<T> clazz, String type, @Nullable T defaultValue) {
+ @Nullable Object value = map.get(name);
+ if (value == null) {
+ if (map.containsKey(name)) {
+ throw new IncorrectTypeException(name, map, type);
+ }
+ return defaultValue;
+ }
+ T result = decodeValue(value, clazz);
+ if (result == null) {
+ // The value exists, but can't be decoded.
+ throw new IncorrectTypeException(name, map, type);
+ }
+ return result;
+ }
+
+ @Nullable
+ private static <T> T decodeValue(Object value, Class<T> clazz) {
+ try {
+ if (value.getClass() == clazz) {
+ // decodeValue() is only called for final classes; if the class matches,
+ // it's safe to just return the value, and if it doesn't match, decoding
+ // is needed.
+ return clazz.cast(value);
+ }
+ if (!(value instanceof Map)) {
+ return null;
+ }
+ @SuppressWarnings("unchecked")
+ Map<String, Object> map = (Map<String, Object>) value;
+ @Nullable String typeName = (String) map.get(PropertyNames.OBJECT_TYPE_NAME);
+ if (typeName == null) {
+ return null;
+ }
+ @Nullable CloudKnownType knownType = CloudKnownType.forUri(typeName);
+ if (knownType == null) {
+ return null;
+ }
+ @Nullable Object scalar = map.get(PropertyNames.SCALAR_FIELD_NAME);
+ if (scalar == null) {
+ return null;
+ }
+ return knownType.parse(scalar, clazz);
+ } catch (ClassCastException e) {
+ // If any class cast fails during decoding, the value's not decodable.
+ return null;
+ }
+ }
+
+ private static final class ParameterNotFoundException extends RuntimeException {
+ public ParameterNotFoundException(String name, Map<String, Object> map) {
+ super("didn't find required parameter " + name + " in " + map);
+ }
+ }
+
+ private static final class IncorrectTypeException extends RuntimeException {
+ public IncorrectTypeException(String name, Map<String, Object> map, String type) {
+ super("required parameter " + name + " in " + map + " not " + type);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/a5627b1a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
index 31c47b4..41f3c92 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
@@ -17,9 +17,9 @@
*/
package org.apache.beam.runners.dataflow;
-import static org.apache.beam.sdk.util.Structs.addObject;
-import static org.apache.beam.sdk.util.Structs.getDictionary;
-import static org.apache.beam.sdk.util.Structs.getString;
+import static org.apache.beam.runners.dataflow.util.Structs.addObject;
+import static org.apache.beam.runners.dataflow.util.Structs.getDictionary;
+import static org.apache.beam.runners.dataflow.util.Structs.getString;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
@@ -60,6 +60,8 @@ import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.JobSpecificat
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions;
import org.apache.beam.runners.dataflow.util.OutputReference;
+import org.apache.beam.runners.dataflow.util.PropertyNames;
+import org.apache.beam.runners.dataflow.util.Structs;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
@@ -80,8 +82,6 @@ import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.util.GcsPathValidator;
import org.apache.beam.sdk.util.GcsUtil;
-import org.apache.beam.sdk.util.PropertyNames;
-import org.apache.beam.sdk.util.Structs;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.util.gcsfs.GcsPath;
import org.apache.beam.sdk.util.state.StateSpec;
http://git-wip-us.apache.org/repos/asf/beam/blob/a5627b1a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java
index 2e66d43..64c0dbd 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java
@@ -54,7 +54,6 @@ import org.apache.beam.sdk.transforms.join.CoGbkResultSchema;
import org.apache.beam.sdk.transforms.join.UnionCoder;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
-import org.apache.beam.sdk.util.CloudObject;
import org.apache.beam.sdk.util.InstanceBuilder;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.TupleTag;
http://git-wip-us.apache.org/repos/asf/beam/blob/a5627b1a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/StructsTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/StructsTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/StructsTest.java
new file mode 100644
index 0000000..0d2bc9f
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/StructsTest.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.util;
+
+import static org.apache.beam.runners.dataflow.util.Structs.addBoolean;
+import static org.apache.beam.runners.dataflow.util.Structs.addDouble;
+import static org.apache.beam.runners.dataflow.util.Structs.addList;
+import static org.apache.beam.runners.dataflow.util.Structs.addLong;
+import static org.apache.beam.runners.dataflow.util.Structs.addLongs;
+import static org.apache.beam.runners.dataflow.util.Structs.addNull;
+import static org.apache.beam.runners.dataflow.util.Structs.addString;
+import static org.apache.beam.runners.dataflow.util.Structs.addStringList;
+import static org.apache.beam.runners.dataflow.util.Structs.getBoolean;
+import static org.apache.beam.runners.dataflow.util.Structs.getDictionary;
+import static org.apache.beam.runners.dataflow.util.Structs.getInt;
+import static org.apache.beam.runners.dataflow.util.Structs.getListOfMaps;
+import static org.apache.beam.runners.dataflow.util.Structs.getLong;
+import static org.apache.beam.runners.dataflow.util.Structs.getObject;
+import static org.apache.beam.runners.dataflow.util.Structs.getString;
+import static org.apache.beam.runners.dataflow.util.Structs.getStrings;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.hamcrest.Matchers;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for Structs.
+ */
+@RunWith(JUnit4.class)
+public class StructsTest {
+ private List<Map<String, Object>> makeCloudObjects() {
+ List<Map<String, Object>> objects = new ArrayList<>();
+ {
+ CloudObject o = CloudObject.forClassName("string");
+ addString(o, "singletonStringKey", "stringValue");
+ objects.add(o);
+ }
+ {
+ CloudObject o = CloudObject.forClassName("long");
+ addLong(o, "singletonLongKey", 42L);
+ objects.add(o);
+ }
+ return objects;
+ }
+
+ private Map<String, Object> makeCloudDictionary() {
+ Map<String, Object> o = new HashMap<>();
+ addList(o, "emptyKey", Collections.<Map<String, Object>>emptyList());
+ addNull(o, "noStringsKey");
+ addString(o, "singletonStringKey", "stringValue");
+ addStringList(o, "multipleStringsKey", Arrays.asList("hi", "there", "bob"));
+ addLongs(o, "multipleLongsKey", 47L, 1L << 42, -5L);
+ addLong(o, "singletonLongKey", 42L);
+ addDouble(o, "singletonDoubleKey", 3.14);
+ addBoolean(o, "singletonBooleanKey", true);
+ addNull(o, "noObjectsKey");
+ addList(o, "multipleObjectsKey", makeCloudObjects());
+ return o;
+ }
+
+ @Test
+ public void testGetStringParameter() throws Exception {
+ Map<String, Object> o = makeCloudDictionary();
+
+ Assert.assertEquals(
+ "stringValue",
+ getString(o, "singletonStringKey"));
+ Assert.assertEquals(
+ "stringValue",
+ getString(o, "singletonStringKey", "defaultValue"));
+ Assert.assertEquals(
+ "defaultValue",
+ getString(o, "missingKey", "defaultValue"));
+
+ try {
+ getString(o, "missingKey");
+ Assert.fail("should have thrown an exception");
+ } catch (Exception exn) {
+ Assert.assertThat(exn.toString(),
+ Matchers.containsString(
+ "didn't find required parameter missingKey"));
+ }
+
+ try {
+ getString(o, "noStringsKey");
+ Assert.fail("should have thrown an exception");
+ } catch (Exception exn) {
+ Assert.assertThat(exn.toString(),
+ Matchers.containsString("not a string"));
+ }
+
+ Assert.assertThat(getStrings(o, "noStringsKey", null), Matchers.<String>emptyIterable());
+ Assert.assertThat(getObject(o, "noStringsKey").keySet(), Matchers.<String>emptyIterable());
+ Assert.assertThat(getDictionary(o, "noStringsKey").keySet(), Matchers.<String>emptyIterable());
+ Assert.assertThat(getDictionary(o, "noStringsKey", null).keySet(),
+ Matchers.<String>emptyIterable());
+
+ try {
+ getString(o, "multipleStringsKey");
+ Assert.fail("should have thrown an exception");
+ } catch (Exception exn) {
+ Assert.assertThat(exn.toString(),
+ Matchers.containsString("not a string"));
+ }
+
+ try {
+ getString(o, "emptyKey");
+ Assert.fail("should have thrown an exception");
+ } catch (Exception exn) {
+ Assert.assertThat(exn.toString(),
+ Matchers.containsString("not a string"));
+ }
+ }
+
+ @Test
+ public void testGetBooleanParameter() throws Exception {
+ Map<String, Object> o = makeCloudDictionary();
+
+ Assert.assertEquals(
+ true,
+ getBoolean(o, "singletonBooleanKey", false));
+ Assert.assertEquals(
+ false,
+ getBoolean(o, "missingKey", false));
+
+ try {
+ getBoolean(o, "emptyKey", false);
+ Assert.fail("should have thrown an exception");
+ } catch (Exception exn) {
+ Assert.assertThat(exn.toString(),
+ Matchers.containsString("not a boolean"));
+ }
+ }
+
+ @Test
+ public void testGetLongParameter() throws Exception {
+ Map<String, Object> o = makeCloudDictionary();
+
+ Assert.assertEquals(
+ (Long) 42L,
+ getLong(o, "singletonLongKey", 666L));
+ Assert.assertEquals(
+ (Integer) 42,
+ getInt(o, "singletonLongKey", 666));
+ Assert.assertEquals(
+ (Long) 666L,
+ getLong(o, "missingKey", 666L));
+
+ try {
+ getLong(o, "emptyKey", 666L);
+ Assert.fail("should have thrown an exception");
+ } catch (Exception exn) {
+ Assert.assertThat(exn.toString(),
+ Matchers.containsString("not a long"));
+ }
+ try {
+ getInt(o, "emptyKey", 666);
+ Assert.fail("should have thrown an exception");
+ } catch (Exception exn) {
+ Assert.assertThat(exn.toString(),
+ Matchers.containsString("not an int"));
+ }
+ }
+
+ @Test
+ public void testGetListOfMaps() throws Exception {
+ Map<String, Object> o = makeCloudDictionary();
+
+ Assert.assertEquals(
+ makeCloudObjects(),
+ getListOfMaps(o, "multipleObjectsKey", null));
+
+ try {
+ getListOfMaps(o, "singletonLongKey", null);
+ Assert.fail("should have thrown an exception");
+ } catch (Exception exn) {
+ Assert.assertThat(exn.toString(),
+ Matchers.containsString("not a list"));
+ }
+ }
+
+ // TODO: Test builder operations.
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/a5627b1a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CollectionCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CollectionCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CollectionCoder.java
index 3585f3e..523b69b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CollectionCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CollectionCoder.java
@@ -17,13 +17,8 @@
*/
package org.apache.beam.sdk.coders;
-import static com.google.common.base.Preconditions.checkArgument;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.Collection;
import java.util.List;
-import org.apache.beam.sdk.util.PropertyNames;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.TypeParameter;
@@ -51,14 +46,6 @@ public class CollectionCoder<T> extends IterableLikeCoder<T, Collection<T>> {
return decodedElements;
}
- @JsonCreator
- public static CollectionCoder<?> of(
- @JsonProperty(PropertyNames.COMPONENT_ENCODINGS)
- List<Coder<?>> components) {
- checkArgument(components.size() == 1, "Expecting 1 component, got %s", components.size());
- return of(components.get(0));
- }
-
/**
* Returns the first element in this collection if it is non-empty,
* otherwise returns {@code null}.
http://git-wip-us.apache.org/repos/asf/beam/blob/a5627b1a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableCoder.java
index 2949ddb..02c3d0f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableCoder.java
@@ -17,12 +17,7 @@
*/
package org.apache.beam.sdk.coders;
-import static com.google.common.base.Preconditions.checkArgument;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.List;
-import org.apache.beam.sdk.util.PropertyNames;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.TypeParameter;
@@ -46,14 +41,6 @@ public class IterableCoder<T> extends IterableLikeCoder<T, Iterable<T>> {
return decodedElements;
}
- @JsonCreator
- public static IterableCoder<?> of(
- @JsonProperty(PropertyNames.COMPONENT_ENCODINGS)
- List<Coder<?>> components) {
- checkArgument(components.size() == 1, "Expecting 1 component, got %s", components.size());
- return of(components.get(0));
- }
-
/**
* Returns the first element in this iterable if it is non-empty,
* otherwise returns {@code null}.
http://git-wip-us.apache.org/repos/asf/beam/blob/a5627b1a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java
index b10db3a..8a689f7 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java
@@ -17,16 +17,11 @@
*/
package org.apache.beam.sdk.coders;
-import static com.google.common.base.Preconditions.checkArgument;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Arrays;
import java.util.List;
-import org.apache.beam.sdk.util.PropertyNames;
import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TypeDescriptor;
@@ -44,14 +39,6 @@ public class KvCoder<K, V> extends StructuredCoder<KV<K, V>> {
return new KvCoder<>(keyCoder, valueCoder);
}
- @JsonCreator
- public static KvCoder<?, ?> of(
- @JsonProperty(PropertyNames.COMPONENT_ENCODINGS)
- List<Coder<?>> components) {
- checkArgument(components.size() == 2, "Expecting 2 components, got %s", components.size());
- return of(components.get(0), components.get(1));
- }
-
public static <K, V> List<Object> getInstanceComponents(
KV<K, V> exampleValue) {
return Arrays.asList(
http://git-wip-us.apache.org/repos/asf/beam/blob/a5627b1a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java
index be26531..685e766 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java
@@ -17,11 +17,8 @@
*/
package org.apache.beam.sdk.coders;
-import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import com.google.common.io.ByteStreams;
import java.io.ByteArrayOutputStream;
@@ -30,7 +27,6 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.util.List;
import javax.annotation.Nullable;
-import org.apache.beam.sdk.util.PropertyNames;
import org.apache.beam.sdk.util.VarInt;
/**
@@ -48,15 +44,6 @@ public class LengthPrefixCoder<T> extends StructuredCoder<T> {
return new LengthPrefixCoder<>(valueCoder);
}
- @JsonCreator
- public static LengthPrefixCoder<?> of(
- @JsonProperty(PropertyNames.COMPONENT_ENCODINGS)
- List<Coder<?>> components) {
- checkArgument(components.size() == 1,
- "Expecting 1 components, got " + components.size());
- return of(components.get(0));
- }
-
/////////////////////////////////////////////////////////////////////////////
private final Coder<T> valueCoder;
http://git-wip-us.apache.org/repos/asf/beam/blob/a5627b1a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ListCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ListCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ListCoder.java
index 6f7a0be..32467d2 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ListCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ListCoder.java
@@ -17,12 +17,7 @@
*/
package org.apache.beam.sdk.coders;
-import static com.google.common.base.Preconditions.checkArgument;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.List;
-import org.apache.beam.sdk.util.PropertyNames;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.TypeParameter;
@@ -45,14 +40,6 @@ public class ListCoder<T> extends IterableLikeCoder<T, List<T>> {
return decodedElements;
}
- @JsonCreator
- public static ListCoder<?> of(
- @JsonProperty(PropertyNames.COMPONENT_ENCODINGS)
- List<Coder<?>> components) {
- checkArgument(components.size() == 1, "Expecting 1 component, got %s", components.size());
- return of((Coder<?>) components.get(0));
- }
-
/**
* Returns the first element in this list if it is non-empty,
* otherwise returns {@code null}.
http://git-wip-us.apache.org/repos/asf/beam/blob/a5627b1a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SetCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SetCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SetCoder.java
index 68ef3dc..da16165 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SetCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SetCoder.java
@@ -17,14 +17,9 @@
*/
package org.apache.beam.sdk.coders;
-import static com.google.common.base.Preconditions.checkArgument;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
-import org.apache.beam.sdk.util.PropertyNames;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.TypeParameter;
@@ -44,17 +39,6 @@ public class SetCoder<T> extends IterableLikeCoder<T, Set<T>> {
}
/**
- * Dynamically typed constructor for JSON deserialization.
- */
- @JsonCreator
- public static SetCoder<?> of(
- @JsonProperty(PropertyNames.COMPONENT_ENCODINGS)
- List<Coder<?>> components) {
- checkArgument(components.size() == 1, "Expecting 1 component, got %s", components.size());
- return of(components.get(0));
- }
-
- /**
* {@inheritDoc}
*
* @throws NonDeterministicException always. Sets are not ordered, but
http://git-wip-us.apache.org/repos/asf/beam/blob/a5627b1a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CloudKnownType.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CloudKnownType.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CloudKnownType.java
deleted file mode 100644
index c9e7427..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CloudKnownType.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import javax.annotation.Nullable;
-
-/**
- * A utility for manipulating well-known cloud types.
- *
- * @deprecated replaced by {@code org.apache.beam.runners.dataflow.CloudKnownType}
- */
-@Deprecated
-enum CloudKnownType {
- TEXT("http://schema.org/Text", String.class) {
- @Override
- public <T> T parse(Object value, Class<T> clazz) {
- return clazz.cast(value);
- }
- },
- BOOLEAN("http://schema.org/Boolean", Boolean.class) {
- @Override
- public <T> T parse(Object value, Class<T> clazz) {
- return clazz.cast(value);
- }
- },
- INTEGER("http://schema.org/Integer", Long.class, Integer.class) {
- @Override
- public <T> T parse(Object value, Class<T> clazz) {
- Object result = null;
- if (value.getClass() == clazz) {
- result = value;
- } else if (clazz == Long.class) {
- if (value instanceof Integer) {
- result = ((Integer) value).longValue();
- } else if (value instanceof String) {
- result = Long.valueOf((String) value);
- }
- } else if (clazz == Integer.class) {
- if (value instanceof Long) {
- result = ((Long) value).intValue();
- } else if (value instanceof String) {
- result = Integer.valueOf((String) value);
- }
- }
- return clazz.cast(result);
- }
- },
- FLOAT("http://schema.org/Float", Double.class, Float.class) {
- @Override
- public <T> T parse(Object value, Class<T> clazz) {
- Object result = null;
- if (value.getClass() == clazz) {
- result = value;
- } else if (clazz == Double.class) {
- if (value instanceof Float) {
- result = ((Float) value).doubleValue();
- } else if (value instanceof String) {
- result = Double.valueOf((String) value);
- }
- } else if (clazz == Float.class) {
- if (value instanceof Double) {
- result = ((Double) value).floatValue();
- } else if (value instanceof String) {
- result = Float.valueOf((String) value);
- }
- }
- return clazz.cast(result);
- }
- };
-
- private final String uri;
- private final Class<?>[] classes;
-
- CloudKnownType(String uri, Class<?>... classes) {
- this.uri = uri;
- this.classes = classes;
- }
-
- public String getUri() {
- return uri;
- }
-
- public abstract <T> T parse(Object value, Class<T> clazz);
-
- public Class<?> defaultClass() {
- return classes[0];
- }
-
- private static final Map<String, CloudKnownType> typesByUri =
- Collections.unmodifiableMap(buildTypesByUri());
-
- private static Map<String, CloudKnownType> buildTypesByUri() {
- Map<String, CloudKnownType> result = new HashMap<>();
- for (CloudKnownType ty : CloudKnownType.values()) {
- result.put(ty.getUri(), ty);
- }
- return result;
- }
-
- @Nullable
- public static CloudKnownType forUri(@Nullable String uri) {
- if (uri == null) {
- return null;
- }
- return typesByUri.get(uri);
- }
-
- private static final Map<Class<?>, CloudKnownType> typesByClass =
- Collections.unmodifiableMap(buildTypesByClass());
-
- private static Map<Class<?>, CloudKnownType> buildTypesByClass() {
- Map<Class<?>, CloudKnownType> result = new HashMap<>();
- for (CloudKnownType ty : CloudKnownType.values()) {
- for (Class<?> clazz : ty.classes) {
- result.put(clazz, ty);
- }
- }
- return result;
- }
-
- @Nullable
- public static CloudKnownType forClass(Class<?> clazz) {
- return typesByClass.get(clazz);
- }
-}
[3/3] beam git commit: [BEAM-1871] Move GCP specific serialization
CloudObject and supporting translation code to Dataflow runner module
Posted by lc...@apache.org.
[BEAM-1871] Move GCP specific serialization CloudObject and supporting translation code to Dataflow runner module
This closes #2862
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/aafa1bba
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/aafa1bba
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/aafa1bba
Branch: refs/heads/master
Commit: aafa1bba984ac9744ef3f21c60a402aee3cec293
Parents: 320f9af a5627b1
Author: Luke Cwik <lc...@google.com>
Authored: Wed May 3 12:47:25 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Wed May 3 12:47:25 2017 -0700
----------------------------------------------------------------------
runners/google-cloud-dataflow-java/pom.xml | 2 +-
.../dataflow/DataflowPipelineTranslator.java | 18 +-
.../beam/runners/dataflow/DataflowRunner.java | 2 +-
.../beam/runners/dataflow/ReadTranslator.java | 8 +-
.../dataflow/internal/CustomSources.java | 6 +-
.../util/AvroCoderCloudObjectTranslator.java | 2 -
.../beam/runners/dataflow/util/CloudObject.java | 1 -
.../runners/dataflow/util/CloudObjectKinds.java | 2 -
.../dataflow/util/CloudObjectTranslator.java | 2 -
.../dataflow/util/CloudObjectTranslators.java | 23 +-
.../runners/dataflow/util/CloudObjects.java | 1 -
.../CoderCloudObjectTranslatorRegistrar.java | 1 -
.../runners/dataflow/util/PropertyNames.java | 112 ++++++
.../SerializableCoderCloudObjectTranslator.java | 2 -
.../beam/runners/dataflow/util/Serializer.java | 262 -------------
.../beam/runners/dataflow/util/Structs.java | 372 +++++++++++++++++++
.../DataflowPipelineTranslatorTest.java | 10 +-
.../runners/dataflow/util/CloudObjectsTest.java | 1 -
.../beam/runners/dataflow/util/StructsTest.java | 206 ++++++++++
.../apache/beam/sdk/coders/CollectionCoder.java | 13 -
.../apache/beam/sdk/coders/IterableCoder.java | 13 -
.../org/apache/beam/sdk/coders/KvCoder.java | 13 -
.../beam/sdk/coders/LengthPrefixCoder.java | 13 -
.../org/apache/beam/sdk/coders/ListCoder.java | 13 -
.../org/apache/beam/sdk/coders/SetCoder.java | 16 -
.../apache/beam/sdk/util/CloudKnownType.java | 143 -------
.../org/apache/beam/sdk/util/CloudObject.java | 187 ----------
.../org/apache/beam/sdk/util/CoderUtils.java | 117 ------
.../org/apache/beam/sdk/util/PropertyNames.java | 112 ------
.../org/apache/beam/sdk/util/Serializer.java | 147 --------
.../java/org/apache/beam/sdk/util/Structs.java | 371 ------------------
.../java/org/apache/beam/sdk/util/Values.java | 88 -----
.../org/apache/beam/sdk/util/WindowedValue.java | 22 --
.../org/apache/beam/sdk/values/TupleTag.java | 22 --
.../apache/beam/sdk/util/SerializerTest.java | 162 --------
.../org/apache/beam/sdk/util/StructsTest.java | 206 ----------
36 files changed, 729 insertions(+), 1962 deletions(-)
----------------------------------------------------------------------