You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2017/05/02 01:03:39 UTC
[1/2] beam git commit: Copy CloudObject to the Dataflow Module
Repository: beam
Updated Branches:
refs/heads/master 7f50ea2e5 -> 3afd338fd
Copy CloudObject to the Dataflow Module
Once migrated on the Dataflow worker, these classes can be removed from
the sdk.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/79b364f7
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/79b364f7
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/79b364f7
Branch: refs/heads/master
Commit: 79b364f7df6122e438a4ee0f12b5cdc7cb694d91
Parents: 7f50ea2
Author: Thomas Groh <tg...@google.com>
Authored: Mon May 1 14:31:17 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Mon May 1 18:02:23 2017 -0700
----------------------------------------------------------------------
.../runners/dataflow/util/CloudKnownType.java | 138 ++++++++++
.../beam/runners/dataflow/util/CloudObject.java | 185 +++++++++++++
.../beam/runners/dataflow/util/Serializer.java | 262 +++++++++++++++++++
.../apache/beam/sdk/util/CloudKnownType.java | 7 +-
.../org/apache/beam/sdk/util/CloudObject.java | 3 +
.../org/apache/beam/sdk/util/CoderUtils.java | 15 +-
.../org/apache/beam/sdk/util/Serializer.java | 3 +
.../apache/beam/sdk/util/CoderUtilsTest.java | 104 --------
8 files changed, 600 insertions(+), 117 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/79b364f7/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudKnownType.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudKnownType.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudKnownType.java
new file mode 100644
index 0000000..ce23a1b
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudKnownType.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.util;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import javax.annotation.Nullable;
+
+/** A utility for manipulating well-known cloud types. */
+enum CloudKnownType {
+ TEXT("http://schema.org/Text", String.class) {
+ @Override
+ public <T> T parse(Object value, Class<T> clazz) {
+ return clazz.cast(value);
+ }
+ },
+ BOOLEAN("http://schema.org/Boolean", Boolean.class) {
+ @Override
+ public <T> T parse(Object value, Class<T> clazz) {
+ return clazz.cast(value);
+ }
+ },
+ INTEGER("http://schema.org/Integer", Long.class, Integer.class) {
+ @Override
+ public <T> T parse(Object value, Class<T> clazz) {
+ Object result = null;
+ if (value.getClass() == clazz) {
+ result = value;
+ } else if (clazz == Long.class) {
+ if (value instanceof Integer) {
+ result = ((Integer) value).longValue();
+ } else if (value instanceof String) {
+ result = Long.valueOf((String) value);
+ }
+ } else if (clazz == Integer.class) {
+ if (value instanceof Long) {
+ result = ((Long) value).intValue();
+ } else if (value instanceof String) {
+ result = Integer.valueOf((String) value);
+ }
+ }
+ return clazz.cast(result);
+ }
+ },
+ FLOAT("http://schema.org/Float", Double.class, Float.class) {
+ @Override
+ public <T> T parse(Object value, Class<T> clazz) {
+ Object result = null;
+ if (value.getClass() == clazz) {
+ result = value;
+ } else if (clazz == Double.class) {
+ if (value instanceof Float) {
+ result = ((Float) value).doubleValue();
+ } else if (value instanceof String) {
+ result = Double.valueOf((String) value);
+ }
+ } else if (clazz == Float.class) {
+ if (value instanceof Double) {
+ result = ((Double) value).floatValue();
+ } else if (value instanceof String) {
+ result = Float.valueOf((String) value);
+ }
+ }
+ return clazz.cast(result);
+ }
+ };
+
+ private final String uri;
+ private final Class<?>[] classes;
+
+ CloudKnownType(String uri, Class<?>... classes) {
+ this.uri = uri;
+ this.classes = classes;
+ }
+
+ public String getUri() {
+ return uri;
+ }
+
+ public abstract <T> T parse(Object value, Class<T> clazz);
+
+ public Class<?> defaultClass() {
+ return classes[0];
+ }
+
+ private static final Map<String, CloudKnownType> typesByUri =
+ Collections.unmodifiableMap(buildTypesByUri());
+
+ private static Map<String, CloudKnownType> buildTypesByUri() {
+ Map<String, CloudKnownType> result = new HashMap<>();
+ for (CloudKnownType ty : CloudKnownType.values()) {
+ result.put(ty.getUri(), ty);
+ }
+ return result;
+ }
+
+ @Nullable
+ public static CloudKnownType forUri(@Nullable String uri) {
+ if (uri == null) {
+ return null;
+ }
+ return typesByUri.get(uri);
+ }
+
+ private static final Map<Class<?>, CloudKnownType> typesByClass =
+ Collections.unmodifiableMap(buildTypesByClass());
+
+ private static Map<Class<?>, CloudKnownType> buildTypesByClass() {
+ Map<Class<?>, CloudKnownType> result = new HashMap<>();
+ for (CloudKnownType ty : CloudKnownType.values()) {
+ for (Class<?> clazz : ty.classes) {
+ result.put(clazz, ty);
+ }
+ }
+ return result;
+ }
+
+ @Nullable
+ public static CloudKnownType forClass(Class<?> clazz) {
+ return typesByClass.get(clazz);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/79b364f7/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObject.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObject.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObject.java
new file mode 100644
index 0000000..e4dd9be
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObject.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.util;
+
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.api.client.json.GenericJson;
+import com.google.api.client.util.Key;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.util.PropertyNames;
+
+/**
+ * A representation of an arbitrary Java object to be instantiated by Dataflow
+ * workers.
+ *
+ * <p>Typically, an object to be written by the SDK to the Dataflow service will
+ * implement a method (typically called {@code asCloudObject()}) that returns a
+ * {@code CloudObject} to represent the object in the protocol. Once the
+ * {@code CloudObject} is constructed, the method should explicitly add
+ * additional properties to be presented during deserialization, representing
+ * child objects by building additional {@code CloudObject}s.
+ */
+public final class CloudObject extends GenericJson {
+ /**
+ * Constructs a {@code CloudObject} by copying the supplied serialized object
+ * spec, which must represent an SDK object serialized for transport via the
+ * Dataflow API.
+ *
+ * <p>The most common use of this method is during deserialization on the worker,
+ * where it's used as a binding type during instance construction.
+ *
+ * @param spec supplies the serialized form of the object as a nested map
+ * @throws RuntimeException if the supplied map does not represent an SDK object
+ */
+ public static CloudObject fromSpec(Map<String, Object> spec) {
+ CloudObject result = new CloudObject();
+ result.putAll(spec);
+ if (result.className == null) {
+ throw new RuntimeException("Unable to create an SDK object from " + spec
+ + ": Object class not specified (missing \""
+ + PropertyNames.OBJECT_TYPE_NAME + "\" field)");
+ }
+ return result;
+ }
+
+ /**
+ * Constructs a {@code CloudObject} to be used for serializing an instance of
+ * the supplied class for transport via the Dataflow API. The instance
+ * parameters to be serialized must be supplied explicitly after the
+ * {@code CloudObject} is created, by using {@link CloudObject#put}.
+ *
+ * @param cls the class to use when deserializing the object on the worker
+ */
+ public static CloudObject forClass(Class<?> cls) {
+ CloudObject result = new CloudObject();
+ result.className = checkNotNull(cls).getName();
+ return result;
+ }
+
+ /**
+ * Constructs a {@code CloudObject} to be used for serializing data to be
+ * deserialized using the supplied class name the supplied class name for
+ * transport via the Dataflow API. The instance parameters to be serialized
+ * must be supplied explicitly after the {@code CloudObject} is created, by
+ * using {@link CloudObject#put}.
+ *
+ * @param className the class to use when deserializing the object on the worker
+ */
+ public static CloudObject forClassName(String className) {
+ CloudObject result = new CloudObject();
+ result.className = checkNotNull(className);
+ return result;
+ }
+
+ /**
+ * Constructs a {@code CloudObject} representing the given value.
+ * @param value the scalar value to represent.
+ */
+ public static CloudObject forString(String value) {
+ CloudObject result = forClassName(CloudKnownType.TEXT.getUri());
+ result.put(PropertyNames.SCALAR_FIELD_NAME, value);
+ return result;
+ }
+
+ /**
+ * Constructs a {@code CloudObject} representing the given value.
+ * @param value the scalar value to represent.
+ */
+ public static CloudObject forBoolean(Boolean value) {
+ CloudObject result = forClassName(CloudKnownType.BOOLEAN.getUri());
+ result.put(PropertyNames.SCALAR_FIELD_NAME, value);
+ return result;
+ }
+
+ /**
+ * Constructs a {@code CloudObject} representing the given value.
+ * @param value the scalar value to represent.
+ */
+ public static CloudObject forInteger(Long value) {
+ CloudObject result = forClassName(CloudKnownType.INTEGER.getUri());
+ result.put(PropertyNames.SCALAR_FIELD_NAME, value);
+ return result;
+ }
+
+ /**
+ * Constructs a {@code CloudObject} representing the given value.
+ * @param value the scalar value to represent.
+ */
+ public static CloudObject forInteger(Integer value) {
+ CloudObject result = forClassName(CloudKnownType.INTEGER.getUri());
+ result.put(PropertyNames.SCALAR_FIELD_NAME, value);
+ return result;
+ }
+
+ /**
+ * Constructs a {@code CloudObject} representing the given value.
+ * @param value the scalar value to represent.
+ */
+ public static CloudObject forFloat(Float value) {
+ CloudObject result = forClassName(CloudKnownType.FLOAT.getUri());
+ result.put(PropertyNames.SCALAR_FIELD_NAME, value);
+ return result;
+ }
+
+ /**
+ * Constructs a {@code CloudObject} representing the given value.
+ * @param value the scalar value to represent.
+ */
+ public static CloudObject forFloat(Double value) {
+ CloudObject result = forClassName(CloudKnownType.FLOAT.getUri());
+ result.put(PropertyNames.SCALAR_FIELD_NAME, value);
+ return result;
+ }
+
+ /**
+ * Constructs a {@code CloudObject} representing the given value of a
+ * well-known cloud object type.
+ * @param value the scalar value to represent.
+ * @throws RuntimeException if the value does not have a
+ * {@link CloudKnownType} mapping
+ */
+ public static CloudObject forKnownType(Object value) {
+ @Nullable CloudKnownType ty = CloudKnownType.forClass(value.getClass());
+ if (ty == null) {
+ throw new RuntimeException("Unable to represent value via the Dataflow API: " + value);
+ }
+ CloudObject result = forClassName(ty.getUri());
+ result.put(PropertyNames.SCALAR_FIELD_NAME, value);
+ return result;
+ }
+
+ @Key(PropertyNames.OBJECT_TYPE_NAME)
+ private String className;
+
+ private CloudObject() {}
+
+ /**
+ * Gets the name of the Java class that this CloudObject represents.
+ */
+ public String getClassName() {
+ return className;
+ }
+
+ @Override
+ public CloudObject clone() {
+ return (CloudObject) super.clone();
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/79b364f7/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/Serializer.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/Serializer.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/Serializer.java
new file mode 100644
index 0000000..e2bcafe
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/Serializer.java
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.util;
+
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import com.fasterxml.jackson.annotation.JsonTypeInfo.As;
+import com.fasterxml.jackson.annotation.JsonTypeInfo.Id;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.DatabindContext;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.JavaType;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.annotation.JsonTypeIdResolver;
+import com.fasterxml.jackson.databind.jsontype.impl.TypeIdResolverBase;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import com.fasterxml.jackson.databind.type.TypeFactory;
+import com.google.common.collect.ImmutableMap;
+import java.lang.reflect.TypeVariable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.IterableCoder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.LengthPrefixCoder;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.util.PropertyNames;
+import org.apache.beam.sdk.util.WindowedValue;
+
+/**
+ * Utility for converting objects between Java and Cloud representations.
+ *
+ * @deprecated Will no longer be used once all coders are converted via {@link CloudObjects}.
+ */
+@Deprecated
+public final class Serializer {
+ /** A mapping from well known coder types to their implementing classes. */
+ private static final Map<String, Class<?>> WELL_KNOWN_CODER_TYPES =
+ ImmutableMap.<String, Class<?>>builder()
+ .put("kind:pair", KvCoder.class)
+ .put("kind:stream", IterableCoder.class)
+ .put("kind:global_window", GlobalWindow.Coder.class)
+ .put("kind:interval_window", IntervalWindow.IntervalWindowCoder.class)
+ .put("kind:length_prefix", LengthPrefixCoder.class)
+ .put("kind:windowed_value", WindowedValue.FullWindowedValueCoder.class)
+ .build();
+
+ // Delay initialization of statics until the first call to Serializer.
+ private static class SingletonHelper {
+ static final ObjectMapper OBJECT_MAPPER = createObjectMapper();
+ static final ObjectMapper TREE_MAPPER = createTreeMapper();
+
+ /**
+ * Creates the object mapper that will be used for serializing Google API
+ * client maps into Jackson trees.
+ */
+ private static ObjectMapper createTreeMapper() {
+ return new ObjectMapper();
+ }
+
+ /**
+ * Creates the object mapper that will be used for deserializing Jackson
+ * trees into objects.
+ */
+ private static ObjectMapper createObjectMapper() {
+ ObjectMapper m = new ObjectMapper();
+ // Ignore properties that are not used by the object.
+ m.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
+
+ // For parameters of type Object, use the @type property to determine the
+ // class to instantiate.
+ //
+ // TODO: It would be ideal to do this for all non-final classes. The
+ // problem with using DefaultTyping.NON_FINAL is that it insists on having
+ // type information in the JSON for classes with useful default
+ // implementations, such as List. Ideally, we'd combine these defaults
+ // with available type information if that information's present.
+ m.enableDefaultTypingAsProperty(
+ ObjectMapper.DefaultTyping.JAVA_LANG_OBJECT,
+ PropertyNames.OBJECT_TYPE_NAME);
+
+ m.registerModule(new Jackson2Module());
+
+ return m;
+ }
+ }
+
+ /**
+ * Deserializes an object from a Dataflow structured encoding (represented in
+ * Java as a map).
+ *
+ * <p>The standard Dataflow SDK object serialization protocol is based on JSON.
+ * Data is typically encoded as a JSON object whose fields represent the
+ * object's data.
+ *
+ * <p>The actual deserialization is performed by Jackson, which can deserialize
+ * public fields, use JavaBean setters, or use injection annotations to
+ * indicate how to construct the object. The {@link ObjectMapper} used is
+ * configured to use the "@type" field as the name of the class to instantiate
+ * (supporting polymorphic types), and may be further configured by
+ * annotations or via {@link ObjectMapper#registerModule}.
+ *
+ * @see <a href="http://wiki.fasterxml.com/JacksonFAQ#Data_Binding.2C_general">
+ * Jackson Data-Binding</a>
+ * @see <a href="https://github.com/FasterXML/jackson-annotations/wiki/Jackson-Annotations">
+ * Jackson-Annotations</a>
+ * @param serialized the object in untyped decoded form (i.e. a nested {@link Map})
+ * @param clazz the expected object class
+ */
+ public static <T> T deserialize(Map<String, Object> serialized, Class<T> clazz) {
+ try {
+ return SingletonHelper.OBJECT_MAPPER.treeToValue(
+ SingletonHelper.TREE_MAPPER.valueToTree(
+ deserializeCloudKnownTypes(serialized)),
+ clazz);
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException(
+ "Unable to deserialize class " + clazz, e);
+ }
+ }
+
+ /**
+ * Recursively walks the supplied map, looking for well-known cloud type information (keyed as
+ * {@link PropertyNames#OBJECT_TYPE_NAME}, matching a URI value from the {@link CloudKnownType}
+ * enum. Upon finding this type information, it converts it into the correspondingly typed Java
+ * value.
+ */
+ @SuppressWarnings("unchecked")
+ private static Object deserializeCloudKnownTypes(Object src) {
+ if (src instanceof Map) {
+ Map<String, Object> srcMap = (Map<String, Object>) src;
+ @Nullable Object value = srcMap.get(PropertyNames.SCALAR_FIELD_NAME);
+ @Nullable CloudKnownType type =
+ CloudKnownType.forUri((String) srcMap.get(PropertyNames.OBJECT_TYPE_NAME));
+ if (type != null && value != null) {
+ // It's a value of a well-known cloud type; let the known type handler
+ // handle the translation.
+ Object result = type.parse(value, type.defaultClass());
+ return result;
+ }
+ // Otherwise, it's just an ordinary map.
+ Map<String, Object> dest = new HashMap<>(srcMap.size());
+ for (Map.Entry<String, Object> entry : srcMap.entrySet()) {
+ dest.put(entry.getKey(), deserializeCloudKnownTypes(entry.getValue()));
+ }
+ return dest;
+ }
+ if (src instanceof List) {
+ List<Object> srcList = (List<Object>) src;
+ List<Object> dest = new ArrayList<>(srcList.size());
+ for (Object obj : srcList) {
+ dest.add(deserializeCloudKnownTypes(obj));
+ }
+ return dest;
+ }
+ // Neither a Map nor a List; no translation needed.
+ return src;
+ }
+
+ /**
+ * A {@link com.fasterxml.jackson.databind.Module} that adds the type
+ * resolver needed for Coder definitions.
+ */
+ static final class Jackson2Module extends SimpleModule {
+ /**
+ * The Coder custom type resolver.
+ *
+ * <p>This resolver resolves coders. If the Coder ID is a particular
+ * well-known identifier, it's replaced with the corresponding class.
+ * All other Coder instances are resolved by class name, using the package
+ * org.apache.beam.sdk.coders if there are no "."s in the ID.
+ */
+ private static final class Resolver extends TypeIdResolverBase {
+ @SuppressWarnings("unused") // Used via @JsonTypeIdResolver annotation on Mixin
+ public Resolver() {
+ super(TypeFactory.defaultInstance().constructType(Coder.class),
+ TypeFactory.defaultInstance());
+ }
+
+ @Override
+ public JavaType typeFromId(DatabindContext context, String id) {
+ Class<?> clazz = getClassForId(id);
+ @SuppressWarnings("rawtypes")
+ TypeVariable[] tvs = clazz.getTypeParameters();
+ JavaType[] types = new JavaType[tvs.length];
+ for (int lupe = 0; lupe < tvs.length; lupe++) {
+ types[lupe] = TypeFactory.unknownType();
+ }
+ return _typeFactory.constructSimpleType(clazz, types);
+ }
+
+ private Class<?> getClassForId(String id) {
+ try {
+ if (id.contains(".")) {
+ return Class.forName(id);
+ }
+
+ if (WELL_KNOWN_CODER_TYPES.containsKey(id)) {
+ return WELL_KNOWN_CODER_TYPES.get(id);
+ }
+
+ // Otherwise, see if the ID is the name of a class in
+ // org.apache.beam.sdk.coders. We do this via creating
+ // the class object so that class loaders have a chance to get
+ // involved -- and since we need the class object anyway.
+ return Class.forName(Coder.class.getPackage().getName() + "." + id);
+ } catch (ClassNotFoundException e) {
+ throw new RuntimeException("Unable to convert coder ID " + id + " to class", e);
+ }
+ }
+
+ @Override
+ public String idFromValueAndType(Object o, Class<?> clazz) {
+ return clazz.getName();
+ }
+
+ @Override
+ public String idFromValue(Object o) {
+ return o.getClass().getName();
+ }
+
+ @Override
+ public JsonTypeInfo.Id getMechanism() {
+ return JsonTypeInfo.Id.CUSTOM;
+ }
+ }
+
+ /**
+ * The mixin class defining how Coders are handled by the deserialization
+ * {@link ObjectMapper}.
+ *
+ * <p>This is done via a mixin so that this resolver is <i>only</i> used
+ * during deserialization requested by the Apache Beam SDK.
+ */
+ @JsonTypeIdResolver(Resolver.class)
+ @JsonTypeInfo(use = Id.CUSTOM, include = As.PROPERTY, property = PropertyNames.OBJECT_TYPE_NAME)
+ private static final class Mixin {}
+
+ public Jackson2Module() {
+ super("BeamCoders");
+ setMixInAnnotation(Coder.class, Mixin.class);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/79b364f7/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CloudKnownType.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CloudKnownType.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CloudKnownType.java
index 430319b..c9e7427 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CloudKnownType.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CloudKnownType.java
@@ -22,7 +22,12 @@ import java.util.HashMap;
import java.util.Map;
import javax.annotation.Nullable;
-/** A utility for manipulating well-known cloud types. */
+/**
+ * A utility for manipulating well-known cloud types.
+ *
+ * @deprecated replaced by {@code org.apache.beam.runners.dataflow.CloudKnownType}
+ */
+@Deprecated
enum CloudKnownType {
TEXT("http://schema.org/Text", String.class) {
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/79b364f7/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CloudObject.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CloudObject.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CloudObject.java
index 9cab453..061e56a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CloudObject.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CloudObject.java
@@ -35,7 +35,10 @@ import javax.annotation.Nullable;
* {@code CloudObject} is constructed, the method should explicitly add
* additional properties to be presented during deserialization, representing
* child objects by building additional {@code CloudObject}s.
+ *
+ * @deprecated replaced by {@code org.apache.beam.runners.dataflow.CloudKnownType}
*/
+@Deprecated
public final class CloudObject extends GenericJson {
/**
* Constructs a {@code CloudObject} by copying the supplied serialized object
http://git-wip-us.apache.org/repos/asf/beam/blob/79b364f7/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CoderUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CoderUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CoderUtils.java
index 857f903..2d21561 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CoderUtils.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CoderUtils.java
@@ -17,8 +17,6 @@
*/
package org.apache.beam.sdk.util;
-import static org.apache.beam.sdk.util.Structs.addList;
-
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.fasterxml.jackson.annotation.JsonTypeInfo.As;
import com.fasterxml.jackson.annotation.JsonTypeInfo.Id;
@@ -217,20 +215,13 @@ public final class CoderUtils {
return codedType;
}
- public static CloudObject makeCloudEncoding(
- String type,
- CloudObject... componentSpecs) {
- CloudObject encoding = CloudObject.forClassName(type);
- if (componentSpecs.length > 0) {
- addList(encoding, PropertyNames.COMPONENT_ENCODINGS, componentSpecs);
- }
- return encoding;
- }
-
/**
* A {@link com.fasterxml.jackson.databind.Module} that adds the type
* resolver needed for Coder definitions.
+ *
+ * <p>Used only in {@link Serializer}, which will move modules
*/
+ @Deprecated
static final class Jackson2Module extends SimpleModule {
/**
* The Coder custom type resolver.
http://git-wip-us.apache.org/repos/asf/beam/blob/79b364f7/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Serializer.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Serializer.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Serializer.java
index 86a3b8e..166e4e7 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Serializer.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Serializer.java
@@ -28,7 +28,10 @@ import javax.annotation.Nullable;
/**
* Utility for converting objects between Java and Cloud representations.
+ *
+ * @deprecated replaced by {@code org.apache.beam.runners.dataflow.util.Serializer}
*/
+@Deprecated
public final class Serializer {
// Delay initialization of statics until the first call to Serializer.
private static class SingletonHelper {
http://git-wip-us.apache.org/repos/asf/beam/blob/79b364f7/sdks/java/core/src/test/java/org/apache/beam/sdk/util/CoderUtilsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/CoderUtilsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/CoderUtilsTest.java
index 32c2af4..0db5355 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/CoderUtilsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/CoderUtilsTest.java
@@ -17,7 +17,6 @@
*/
package org.apache.beam.sdk.util;
-import static org.apache.beam.sdk.util.CoderUtils.makeCloudEncoding;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.doThrow;
@@ -25,18 +24,11 @@ import static org.mockito.Mockito.mock;
import java.io.InputStream;
import java.io.OutputStream;
-import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.Coder.Context;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.CustomCoder;
-import org.apache.beam.sdk.coders.IterableCoder;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.testing.CoderPropertiesTest.ClosingCoder;
-import org.hamcrest.CoreMatchers;
-import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
@@ -89,102 +81,6 @@ public class CoderUtilsTest {
}
@Test
- public void testCreateAtomicCoders() throws Exception {
- Assert.assertEquals(
- BigEndianIntegerCoder.of(),
- Serializer.deserialize(makeCloudEncoding("BigEndianIntegerCoder"), Coder.class));
- Assert.assertEquals(
- StringUtf8Coder.of(),
- Serializer.deserialize(
- makeCloudEncoding(StringUtf8Coder.class.getName()), Coder.class));
- Assert.assertEquals(
- VoidCoder.of(),
- Serializer.deserialize(makeCloudEncoding("VoidCoder"), Coder.class));
- Assert.assertEquals(
- TestCoder.of(),
- Serializer.deserialize(makeCloudEncoding(TestCoder.class.getName()), Coder.class));
- }
-
- @Test
- public void testCreateCompositeCoders() throws Exception {
- Assert.assertEquals(
- IterableCoder.of(StringUtf8Coder.of()),
- Serializer.deserialize(
- makeCloudEncoding("IterableCoder",
- makeCloudEncoding("StringUtf8Coder")), Coder.class));
- Assert.assertEquals(
- KvCoder.of(BigEndianIntegerCoder.of(), VoidCoder.of()),
- Serializer.deserialize(
- makeCloudEncoding(
- "KvCoder",
- makeCloudEncoding(BigEndianIntegerCoder.class.getName()),
- makeCloudEncoding("VoidCoder")), Coder.class));
- Assert.assertEquals(
- IterableCoder.of(
- KvCoder.of(IterableCoder.of(BigEndianIntegerCoder.of()),
- KvCoder.of(VoidCoder.of(),
- TestCoder.of()))),
- Serializer.deserialize(
- makeCloudEncoding(
- IterableCoder.class.getName(),
- makeCloudEncoding(
- KvCoder.class.getName(),
- makeCloudEncoding(
- "IterableCoder",
- makeCloudEncoding("BigEndianIntegerCoder")),
- makeCloudEncoding(
- "KvCoder",
- makeCloudEncoding("VoidCoder"),
- makeCloudEncoding(TestCoder.class.getName())))), Coder.class));
- }
-
- @Test
- public void testCreateUntypedCoders() throws Exception {
- Assert.assertEquals(
- IterableCoder.of(StringUtf8Coder.of()),
- Serializer.deserialize(
- makeCloudEncoding(
- "kind:stream",
- makeCloudEncoding("StringUtf8Coder")), Coder.class));
- Assert.assertEquals(
- KvCoder.of(BigEndianIntegerCoder.of(), VoidCoder.of()),
- Serializer.deserialize(
- makeCloudEncoding(
- "kind:pair",
- makeCloudEncoding(BigEndianIntegerCoder.class.getName()),
- makeCloudEncoding("VoidCoder")), Coder.class));
- Assert.assertEquals(
- IterableCoder.of(
- KvCoder.of(IterableCoder.of(BigEndianIntegerCoder.of()),
- KvCoder.of(VoidCoder.of(),
- TestCoder.of()))),
- Serializer.deserialize(
- makeCloudEncoding(
- "kind:stream",
- makeCloudEncoding(
- "kind:pair",
- makeCloudEncoding(
- "kind:stream",
- makeCloudEncoding("BigEndianIntegerCoder")),
- makeCloudEncoding(
- "kind:pair",
- makeCloudEncoding("VoidCoder"),
- makeCloudEncoding(TestCoder.class.getName())))), Coder.class));
- }
-
- @Test
- public void testCreateUnknownCoder() throws Exception {
- try {
- Serializer.deserialize(makeCloudEncoding("UnknownCoder"), Coder.class);
- Assert.fail("should have thrown an exception");
- } catch (Exception exn) {
- Assert.assertThat(exn.toString(),
- CoreMatchers.containsString(
- "Unable to convert coder ID UnknownCoder to class"));
- }
- }
-
- @Test
public void testClosingCoderFailsWhenDecodingBase64() throws Exception {
expectedException.expect(UnsupportedOperationException.class);
expectedException.expectMessage("Caller does not own the underlying");
[2/2] beam git commit: [BEAM-2020] Copy CloudObject to the Dataflow
Module
Posted by lc...@apache.org.
[BEAM-2020] Copy CloudObject to the Dataflow Module
This closes #2808
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3afd338f
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3afd338f
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3afd338f
Branch: refs/heads/master
Commit: 3afd338fd928b4bbd43c29cd75c48e61aabb2118
Parents: 7f50ea2 79b364f
Author: Luke Cwik <lc...@google.com>
Authored: Mon May 1 18:03:28 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Mon May 1 18:03:28 2017 -0700
----------------------------------------------------------------------
.../runners/dataflow/util/CloudKnownType.java | 138 ++++++++++
.../beam/runners/dataflow/util/CloudObject.java | 185 +++++++++++++
.../beam/runners/dataflow/util/Serializer.java | 262 +++++++++++++++++++
.../apache/beam/sdk/util/CloudKnownType.java | 7 +-
.../org/apache/beam/sdk/util/CloudObject.java | 3 +
.../org/apache/beam/sdk/util/CoderUtils.java | 15 +-
.../org/apache/beam/sdk/util/Serializer.java | 3 +
.../apache/beam/sdk/util/CoderUtilsTest.java | 104 --------
8 files changed, 600 insertions(+), 117 deletions(-)
----------------------------------------------------------------------