You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2017/05/02 01:03:39 UTC

[1/2] beam git commit: Copy CloudObject to the Dataflow Module

Repository: beam
Updated Branches:
  refs/heads/master 7f50ea2e5 -> 3afd338fd


Copy CloudObject to the Dataflow Module

Once migrated on the Dataflow worker, these classes can be removed from
the sdk.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/79b364f7
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/79b364f7
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/79b364f7

Branch: refs/heads/master
Commit: 79b364f7df6122e438a4ee0f12b5cdc7cb694d91
Parents: 7f50ea2
Author: Thomas Groh <tg...@google.com>
Authored: Mon May 1 14:31:17 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Mon May 1 18:02:23 2017 -0700

----------------------------------------------------------------------
 .../runners/dataflow/util/CloudKnownType.java   | 138 ++++++++++
 .../beam/runners/dataflow/util/CloudObject.java | 185 +++++++++++++
 .../beam/runners/dataflow/util/Serializer.java  | 262 +++++++++++++++++++
 .../apache/beam/sdk/util/CloudKnownType.java    |   7 +-
 .../org/apache/beam/sdk/util/CloudObject.java   |   3 +
 .../org/apache/beam/sdk/util/CoderUtils.java    |  15 +-
 .../org/apache/beam/sdk/util/Serializer.java    |   3 +
 .../apache/beam/sdk/util/CoderUtilsTest.java    | 104 --------
 8 files changed, 600 insertions(+), 117 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/79b364f7/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudKnownType.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudKnownType.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudKnownType.java
new file mode 100644
index 0000000..ce23a1b
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudKnownType.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.util;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import javax.annotation.Nullable;
+
+/** A utility for manipulating well-known cloud types. */
+enum CloudKnownType {
+  TEXT("http://schema.org/Text", String.class) {
+    @Override
+    public <T> T parse(Object value, Class<T> clazz) {
+      return clazz.cast(value);
+    }
+  },
+  BOOLEAN("http://schema.org/Boolean", Boolean.class) {
+    @Override
+    public <T> T parse(Object value, Class<T> clazz) {
+      return clazz.cast(value);
+    }
+  },
+  INTEGER("http://schema.org/Integer", Long.class, Integer.class) {
+    @Override
+    public <T> T parse(Object value, Class<T> clazz) {
+      Object result = null;
+      if (value.getClass() == clazz) {
+        result = value;
+      } else if (clazz == Long.class) {
+        if (value instanceof Integer) {
+          result = ((Integer) value).longValue();
+        } else if (value instanceof String) {
+          result = Long.valueOf((String) value);
+        }
+      } else if (clazz == Integer.class) {
+        if (value instanceof Long) {
+          result = ((Long) value).intValue();
+        } else if (value instanceof String) {
+          result = Integer.valueOf((String) value);
+        }
+      }
+      return clazz.cast(result);
+    }
+  },
+  FLOAT("http://schema.org/Float", Double.class, Float.class) {
+    @Override
+    public <T> T parse(Object value, Class<T> clazz) {
+      Object result = null;
+      if (value.getClass() == clazz) {
+        result = value;
+      } else if (clazz == Double.class) {
+        if (value instanceof Float) {
+          result = ((Float) value).doubleValue();
+        } else if (value instanceof String) {
+          result = Double.valueOf((String) value);
+        }
+      } else if (clazz == Float.class) {
+        if (value instanceof Double) {
+          result = ((Double) value).floatValue();
+        } else if (value instanceof String) {
+          result = Float.valueOf((String) value);
+        }
+      }
+      return clazz.cast(result);
+    }
+  };
+
+  private final String uri;
+  private final Class<?>[] classes;
+
+  CloudKnownType(String uri, Class<?>... classes) {
+    this.uri = uri;
+    this.classes = classes;
+  }
+
+  public String getUri() {
+    return uri;
+  }
+
+  public abstract <T> T parse(Object value, Class<T> clazz);
+
+  public Class<?> defaultClass() {
+    return classes[0];
+  }
+
+  private static final Map<String, CloudKnownType> typesByUri =
+      Collections.unmodifiableMap(buildTypesByUri());
+
+  private static Map<String, CloudKnownType> buildTypesByUri() {
+    Map<String, CloudKnownType> result = new HashMap<>();
+    for (CloudKnownType ty : CloudKnownType.values()) {
+      result.put(ty.getUri(), ty);
+    }
+    return result;
+  }
+
+  @Nullable
+  public static CloudKnownType forUri(@Nullable String uri) {
+    if (uri == null) {
+      return null;
+    }
+    return typesByUri.get(uri);
+  }
+
+  private static final Map<Class<?>, CloudKnownType> typesByClass =
+  Collections.unmodifiableMap(buildTypesByClass());
+
+  private static Map<Class<?>, CloudKnownType> buildTypesByClass() {
+    Map<Class<?>, CloudKnownType> result = new HashMap<>();
+    for (CloudKnownType ty : CloudKnownType.values()) {
+      for (Class<?> clazz : ty.classes) {
+        result.put(clazz, ty);
+      }
+    }
+    return result;
+  }
+
+  @Nullable
+  public static CloudKnownType forClass(Class<?> clazz) {
+    return typesByClass.get(clazz);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/79b364f7/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObject.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObject.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObject.java
new file mode 100644
index 0000000..e4dd9be
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObject.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.util;
+
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.api.client.json.GenericJson;
+import com.google.api.client.util.Key;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.util.PropertyNames;
+
+/**
+ * A representation of an arbitrary Java object to be instantiated by Dataflow
+ * workers.
+ *
+ * <p>Typically, an object to be written by the SDK to the Dataflow service will
+ * implement a method (typically called {@code asCloudObject()}) that returns a
+ * {@code CloudObject} to represent the object in the protocol.  Once the
+ * {@code CloudObject} is constructed, the method should explicitly add
+ * additional properties to be presented during deserialization, representing
+ * child objects by building additional {@code CloudObject}s.
+ */
+public final class CloudObject extends GenericJson {
+  /**
+   * Constructs a {@code CloudObject} by copying the supplied serialized object
+   * spec, which must represent an SDK object serialized for transport via the
+   * Dataflow API.
+   *
+   * <p>The most common use of this method is during deserialization on the worker,
+   * where it's used as a binding type during instance construction.
+   *
+   * @param spec supplies the serialized form of the object as a nested map
+   * @throws RuntimeException if the supplied map does not represent an SDK object
+   */
+  public static CloudObject fromSpec(Map<String, Object> spec) {
+    CloudObject result = new CloudObject();
+    result.putAll(spec);
+    if (result.className == null) {
+      throw new RuntimeException("Unable to create an SDK object from " + spec
+          + ": Object class not specified (missing \""
+          + PropertyNames.OBJECT_TYPE_NAME + "\" field)");
+    }
+    return result;
+  }
+
+  /**
+   * Constructs a {@code CloudObject} to be used for serializing an instance of
+   * the supplied class for transport via the Dataflow API.  The instance
+   * parameters to be serialized must be supplied explicitly after the
+   * {@code CloudObject} is created, by using {@link CloudObject#put}.
+   *
+   * @param cls the class to use when deserializing the object on the worker
+   */
+  public static CloudObject forClass(Class<?> cls) {
+    CloudObject result = new CloudObject();
+    result.className = checkNotNull(cls).getName();
+    return result;
+  }
+
+  /**
+   * Constructs a {@code CloudObject} to be used for serializing data to be
+   * deserialized using the supplied class name the supplied class name for
+   * transport via the Dataflow API.  The instance parameters to be serialized
+   * must be supplied explicitly after the {@code CloudObject} is created, by
+   * using {@link CloudObject#put}.
+   *
+   * @param className the class to use when deserializing the object on the worker
+   */
+  public static CloudObject forClassName(String className) {
+    CloudObject result = new CloudObject();
+    result.className = checkNotNull(className);
+    return result;
+  }
+
+  /**
+   * Constructs a {@code CloudObject} representing the given value.
+   * @param value the scalar value to represent.
+   */
+  public static CloudObject forString(String value) {
+    CloudObject result = forClassName(CloudKnownType.TEXT.getUri());
+    result.put(PropertyNames.SCALAR_FIELD_NAME, value);
+    return result;
+  }
+
+  /**
+   * Constructs a {@code CloudObject} representing the given value.
+   * @param value the scalar value to represent.
+   */
+  public static CloudObject forBoolean(Boolean value) {
+    CloudObject result = forClassName(CloudKnownType.BOOLEAN.getUri());
+    result.put(PropertyNames.SCALAR_FIELD_NAME, value);
+    return result;
+  }
+
+  /**
+   * Constructs a {@code CloudObject} representing the given value.
+   * @param value the scalar value to represent.
+   */
+  public static CloudObject forInteger(Long value) {
+    CloudObject result = forClassName(CloudKnownType.INTEGER.getUri());
+    result.put(PropertyNames.SCALAR_FIELD_NAME, value);
+    return result;
+  }
+
+  /**
+   * Constructs a {@code CloudObject} representing the given value.
+   * @param value the scalar value to represent.
+   */
+  public static CloudObject forInteger(Integer value) {
+    CloudObject result = forClassName(CloudKnownType.INTEGER.getUri());
+    result.put(PropertyNames.SCALAR_FIELD_NAME, value);
+    return result;
+  }
+
+  /**
+   * Constructs a {@code CloudObject} representing the given value.
+   * @param value the scalar value to represent.
+   */
+  public static CloudObject forFloat(Float value) {
+    CloudObject result = forClassName(CloudKnownType.FLOAT.getUri());
+    result.put(PropertyNames.SCALAR_FIELD_NAME, value);
+    return result;
+  }
+
+  /**
+   * Constructs a {@code CloudObject} representing the given value.
+   * @param value the scalar value to represent.
+   */
+  public static CloudObject forFloat(Double value) {
+    CloudObject result = forClassName(CloudKnownType.FLOAT.getUri());
+    result.put(PropertyNames.SCALAR_FIELD_NAME, value);
+    return result;
+  }
+
+  /**
+   * Constructs a {@code CloudObject} representing the given value of a
+   * well-known cloud object type.
+   * @param value the scalar value to represent.
+   * @throws RuntimeException if the value does not have a
+   * {@link CloudKnownType} mapping
+   */
+  public static CloudObject forKnownType(Object value) {
+    @Nullable CloudKnownType ty = CloudKnownType.forClass(value.getClass());
+    if (ty == null) {
+      throw new RuntimeException("Unable to represent value via the Dataflow API: " + value);
+    }
+    CloudObject result = forClassName(ty.getUri());
+    result.put(PropertyNames.SCALAR_FIELD_NAME, value);
+    return result;
+  }
+
+  @Key(PropertyNames.OBJECT_TYPE_NAME)
+  private String className;
+
+  private CloudObject() {}
+
+  /**
+   * Gets the name of the Java class that this CloudObject represents.
+   */
+  public String getClassName() {
+    return className;
+  }
+
+  @Override
+  public CloudObject clone() {
+    return (CloudObject) super.clone();
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/79b364f7/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/Serializer.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/Serializer.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/Serializer.java
new file mode 100644
index 0000000..e2bcafe
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/Serializer.java
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.util;
+
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import com.fasterxml.jackson.annotation.JsonTypeInfo.As;
+import com.fasterxml.jackson.annotation.JsonTypeInfo.Id;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.DatabindContext;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.JavaType;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.annotation.JsonTypeIdResolver;
+import com.fasterxml.jackson.databind.jsontype.impl.TypeIdResolverBase;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import com.fasterxml.jackson.databind.type.TypeFactory;
+import com.google.common.collect.ImmutableMap;
+import java.lang.reflect.TypeVariable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.IterableCoder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.LengthPrefixCoder;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.util.PropertyNames;
+import org.apache.beam.sdk.util.WindowedValue;
+
+/**
+ * Utility for converting objects between Java and Cloud representations.
+ *
+ * @deprecated Will no longer be used once all coders are converted via {@link CloudObjects}.
+ */
+@Deprecated
+public final class Serializer {
+  /** A mapping from well known coder types to their implementing classes. */
+  private static final Map<String, Class<?>> WELL_KNOWN_CODER_TYPES =
+      ImmutableMap.<String, Class<?>>builder()
+          .put("kind:pair", KvCoder.class)
+          .put("kind:stream", IterableCoder.class)
+          .put("kind:global_window", GlobalWindow.Coder.class)
+          .put("kind:interval_window", IntervalWindow.IntervalWindowCoder.class)
+          .put("kind:length_prefix", LengthPrefixCoder.class)
+          .put("kind:windowed_value", WindowedValue.FullWindowedValueCoder.class)
+          .build();
+
+  // Delay initialization of statics until the first call to Serializer.
+  private static class SingletonHelper {
+    static final ObjectMapper OBJECT_MAPPER = createObjectMapper();
+    static final ObjectMapper TREE_MAPPER = createTreeMapper();
+
+    /**
+     * Creates the object mapper that will be used for serializing Google API
+     * client maps into Jackson trees.
+     */
+    private static ObjectMapper createTreeMapper() {
+      return new ObjectMapper();
+    }
+
+    /**
+     * Creates the object mapper that will be used for deserializing Jackson
+     * trees into objects.
+     */
+    private static ObjectMapper createObjectMapper() {
+      ObjectMapper m = new ObjectMapper();
+      // Ignore properties that are not used by the object.
+      m.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
+
+      // For parameters of type Object, use the @type property to determine the
+      // class to instantiate.
+      //
+      // TODO: It would be ideal to do this for all non-final classes.  The
+      // problem with using DefaultTyping.NON_FINAL is that it insists on having
+      // type information in the JSON for classes with useful default
+      // implementations, such as List.  Ideally, we'd combine these defaults
+      // with available type information if that information's present.
+      m.enableDefaultTypingAsProperty(
+           ObjectMapper.DefaultTyping.JAVA_LANG_OBJECT,
+           PropertyNames.OBJECT_TYPE_NAME);
+
+      m.registerModule(new Jackson2Module());
+
+      return m;
+    }
+  }
+
+  /**
+   * Deserializes an object from a Dataflow structured encoding (represented in
+   * Java as a map).
+   *
+   * <p>The standard Dataflow SDK object serialization protocol is based on JSON.
+   * Data is typically encoded as a JSON object whose fields represent the
+   * object's data.
+   *
+   * <p>The actual deserialization is performed by Jackson, which can deserialize
+   * public fields, use JavaBean setters, or use injection annotations to
+   * indicate how to construct the object.  The {@link ObjectMapper} used is
+   * configured to use the "@type" field as the name of the class to instantiate
+   * (supporting polymorphic types), and may be further configured by
+   * annotations or via {@link ObjectMapper#registerModule}.
+   *
+   * @see <a href="http://wiki.fasterxml.com/JacksonFAQ#Data_Binding.2C_general">
+   * Jackson Data-Binding</a>
+   * @see <a href="https://github.com/FasterXML/jackson-annotations/wiki/Jackson-Annotations">
+   * Jackson-Annotations</a>
+   * @param serialized the object in untyped decoded form (i.e. a nested {@link Map})
+   * @param clazz the expected object class
+   */
+  public static <T> T deserialize(Map<String, Object> serialized, Class<T> clazz) {
+    try {
+      return SingletonHelper.OBJECT_MAPPER.treeToValue(
+          SingletonHelper.TREE_MAPPER.valueToTree(
+              deserializeCloudKnownTypes(serialized)),
+          clazz);
+    } catch (JsonProcessingException e) {
+      throw new RuntimeException(
+          "Unable to deserialize class " + clazz, e);
+    }
+  }
+
+  /**
+   * Recursively walks the supplied map, looking for well-known cloud type information (keyed as
+   * {@link PropertyNames#OBJECT_TYPE_NAME}, matching a URI value from the {@link CloudKnownType}
+   * enum. Upon finding this type information, it converts it into the correspondingly typed Java
+   * value.
+   */
+  @SuppressWarnings("unchecked")
+  private static Object deserializeCloudKnownTypes(Object src) {
+    if (src instanceof Map) {
+      Map<String, Object> srcMap = (Map<String, Object>) src;
+      @Nullable Object value = srcMap.get(PropertyNames.SCALAR_FIELD_NAME);
+      @Nullable CloudKnownType type =
+          CloudKnownType.forUri((String) srcMap.get(PropertyNames.OBJECT_TYPE_NAME));
+      if (type != null && value != null) {
+        // It's a value of a well-known cloud type; let the known type handler
+        // handle the translation.
+        Object result = type.parse(value, type.defaultClass());
+        return result;
+      }
+      // Otherwise, it's just an ordinary map.
+      Map<String, Object> dest = new HashMap<>(srcMap.size());
+      for (Map.Entry<String, Object> entry : srcMap.entrySet()) {
+        dest.put(entry.getKey(), deserializeCloudKnownTypes(entry.getValue()));
+      }
+      return dest;
+    }
+    if (src instanceof List) {
+      List<Object> srcList = (List<Object>) src;
+      List<Object> dest = new ArrayList<>(srcList.size());
+      for (Object obj : srcList) {
+        dest.add(deserializeCloudKnownTypes(obj));
+      }
+      return dest;
+    }
+    // Neither a Map nor a List; no translation needed.
+    return src;
+  }
+
+  /**
+   * A {@link com.fasterxml.jackson.databind.Module} that adds the type
+   * resolver needed for Coder definitions.
+   */
+  static final class Jackson2Module extends SimpleModule {
+    /**
+     * The Coder custom type resolver.
+     *
+     * <p>This resolver resolves coders. If the Coder ID is a particular
+     * well-known identifier, it's replaced with the corresponding class.
+     * All other Coder instances are resolved by class name, using the package
+     * org.apache.beam.sdk.coders if there are no "."s in the ID.
+     */
+    private static final class Resolver extends TypeIdResolverBase {
+      @SuppressWarnings("unused") // Used via @JsonTypeIdResolver annotation on Mixin
+      public Resolver() {
+        super(TypeFactory.defaultInstance().constructType(Coder.class),
+            TypeFactory.defaultInstance());
+      }
+
+      @Override
+      public JavaType typeFromId(DatabindContext context, String id) {
+        Class<?> clazz = getClassForId(id);
+        @SuppressWarnings("rawtypes")
+        TypeVariable[] tvs = clazz.getTypeParameters();
+        JavaType[] types = new JavaType[tvs.length];
+        for (int lupe = 0; lupe < tvs.length; lupe++) {
+          types[lupe] = TypeFactory.unknownType();
+        }
+        return _typeFactory.constructSimpleType(clazz, types);
+      }
+
+      private Class<?> getClassForId(String id) {
+        try {
+          if (id.contains(".")) {
+            return Class.forName(id);
+          }
+
+          if (WELL_KNOWN_CODER_TYPES.containsKey(id)) {
+            return WELL_KNOWN_CODER_TYPES.get(id);
+          }
+
+          // Otherwise, see if the ID is the name of a class in
+          // org.apache.beam.sdk.coders.  We do this via creating
+          // the class object so that class loaders have a chance to get
+          // involved -- and since we need the class object anyway.
+          return Class.forName(Coder.class.getPackage().getName() + "." + id);
+        } catch (ClassNotFoundException e) {
+          throw new RuntimeException("Unable to convert coder ID " + id + " to class", e);
+        }
+      }
+
+      @Override
+      public String idFromValueAndType(Object o, Class<?> clazz) {
+        return clazz.getName();
+      }
+
+      @Override
+      public String idFromValue(Object o) {
+        return o.getClass().getName();
+      }
+
+      @Override
+      public JsonTypeInfo.Id getMechanism() {
+        return JsonTypeInfo.Id.CUSTOM;
+      }
+    }
+
+    /**
+     * The mixin class defining how Coders are handled by the deserialization
+     * {@link ObjectMapper}.
+     *
+     * <p>This is done via a mixin so that this resolver is <i>only</i> used
+     * during deserialization requested by the Apache Beam SDK.
+     */
+    @JsonTypeIdResolver(Resolver.class)
+    @JsonTypeInfo(use = Id.CUSTOM, include = As.PROPERTY, property = PropertyNames.OBJECT_TYPE_NAME)
+    private static final class Mixin {}
+
+    public Jackson2Module() {
+      super("BeamCoders");
+      setMixInAnnotation(Coder.class, Mixin.class);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/79b364f7/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CloudKnownType.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CloudKnownType.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CloudKnownType.java
index 430319b..c9e7427 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CloudKnownType.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CloudKnownType.java
@@ -22,7 +22,12 @@ import java.util.HashMap;
 import java.util.Map;
 import javax.annotation.Nullable;
 
-/** A utility for manipulating well-known cloud types. */
+/**
+ * A utility for manipulating well-known cloud types.
+ *
+ * @deprecated replaced by {@code org.apache.beam.runners.dataflow.CloudKnownType}
+ */
+@Deprecated
 enum CloudKnownType {
   TEXT("http://schema.org/Text", String.class) {
     @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/79b364f7/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CloudObject.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CloudObject.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CloudObject.java
index 9cab453..061e56a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CloudObject.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CloudObject.java
@@ -35,7 +35,10 @@ import javax.annotation.Nullable;
  * {@code CloudObject} is constructed, the method should explicitly add
  * additional properties to be presented during deserialization, representing
  * child objects by building additional {@code CloudObject}s.
+ *
+ * @deprecated replaced by {@code org.apache.beam.runners.dataflow.CloudKnownType}
  */
+@Deprecated
 public final class CloudObject extends GenericJson {
   /**
    * Constructs a {@code CloudObject} by copying the supplied serialized object

http://git-wip-us.apache.org/repos/asf/beam/blob/79b364f7/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CoderUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CoderUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CoderUtils.java
index 857f903..2d21561 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CoderUtils.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CoderUtils.java
@@ -17,8 +17,6 @@
  */
 package org.apache.beam.sdk.util;
 
-import static org.apache.beam.sdk.util.Structs.addList;
-
 import com.fasterxml.jackson.annotation.JsonTypeInfo;
 import com.fasterxml.jackson.annotation.JsonTypeInfo.As;
 import com.fasterxml.jackson.annotation.JsonTypeInfo.Id;
@@ -217,20 +215,13 @@ public final class CoderUtils {
     return codedType;
   }
 
-  public static CloudObject makeCloudEncoding(
-      String type,
-      CloudObject... componentSpecs) {
-    CloudObject encoding = CloudObject.forClassName(type);
-    if (componentSpecs.length > 0) {
-      addList(encoding, PropertyNames.COMPONENT_ENCODINGS, componentSpecs);
-    }
-    return encoding;
-  }
-
   /**
    * A {@link com.fasterxml.jackson.databind.Module} that adds the type
    * resolver needed for Coder definitions.
+   *
+   * <p>Used only in {@link Serializer}, which will move modules
    */
+  @Deprecated
   static final class Jackson2Module extends SimpleModule {
     /**
      * The Coder custom type resolver.

http://git-wip-us.apache.org/repos/asf/beam/blob/79b364f7/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Serializer.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Serializer.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Serializer.java
index 86a3b8e..166e4e7 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Serializer.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Serializer.java
@@ -28,7 +28,10 @@ import javax.annotation.Nullable;
 
 /**
  * Utility for converting objects between Java and Cloud representations.
+ *
+ * @deprecated replaced by {@code org.apache.beam.runners.dataflow.util.Serializer}
  */
+@Deprecated
 public final class Serializer {
   // Delay initialization of statics until the first call to Serializer.
   private static class SingletonHelper {

http://git-wip-us.apache.org/repos/asf/beam/blob/79b364f7/sdks/java/core/src/test/java/org/apache/beam/sdk/util/CoderUtilsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/CoderUtilsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/CoderUtilsTest.java
index 32c2af4..0db5355 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/CoderUtilsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/CoderUtilsTest.java
@@ -17,7 +17,6 @@
  */
 package org.apache.beam.sdk.util;
 
-import static org.apache.beam.sdk.util.CoderUtils.makeCloudEncoding;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.doThrow;
@@ -25,18 +24,11 @@ import static org.mockito.Mockito.mock;
 
 import java.io.InputStream;
 import java.io.OutputStream;
-import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.Coder.Context;
 import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.CustomCoder;
-import org.apache.beam.sdk.coders.IterableCoder;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.testing.CoderPropertiesTest.ClosingCoder;
-import org.hamcrest.CoreMatchers;
-import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
@@ -89,102 +81,6 @@ public class CoderUtilsTest {
   }
 
   @Test
-  public void testCreateAtomicCoders() throws Exception {
-    Assert.assertEquals(
-        BigEndianIntegerCoder.of(),
-        Serializer.deserialize(makeCloudEncoding("BigEndianIntegerCoder"), Coder.class));
-    Assert.assertEquals(
-        StringUtf8Coder.of(),
-        Serializer.deserialize(
-            makeCloudEncoding(StringUtf8Coder.class.getName()), Coder.class));
-    Assert.assertEquals(
-        VoidCoder.of(),
-        Serializer.deserialize(makeCloudEncoding("VoidCoder"), Coder.class));
-    Assert.assertEquals(
-        TestCoder.of(),
-        Serializer.deserialize(makeCloudEncoding(TestCoder.class.getName()), Coder.class));
-  }
-
-  @Test
-  public void testCreateCompositeCoders() throws Exception {
-    Assert.assertEquals(
-        IterableCoder.of(StringUtf8Coder.of()),
-        Serializer.deserialize(
-            makeCloudEncoding("IterableCoder",
-                makeCloudEncoding("StringUtf8Coder")), Coder.class));
-    Assert.assertEquals(
-        KvCoder.of(BigEndianIntegerCoder.of(), VoidCoder.of()),
-        Serializer.deserialize(
-            makeCloudEncoding(
-                "KvCoder",
-                makeCloudEncoding(BigEndianIntegerCoder.class.getName()),
-                makeCloudEncoding("VoidCoder")), Coder.class));
-    Assert.assertEquals(
-        IterableCoder.of(
-            KvCoder.of(IterableCoder.of(BigEndianIntegerCoder.of()),
-                       KvCoder.of(VoidCoder.of(),
-                                  TestCoder.of()))),
-        Serializer.deserialize(
-            makeCloudEncoding(
-                IterableCoder.class.getName(),
-                makeCloudEncoding(
-                    KvCoder.class.getName(),
-                    makeCloudEncoding(
-                        "IterableCoder",
-                        makeCloudEncoding("BigEndianIntegerCoder")),
-                    makeCloudEncoding(
-                        "KvCoder",
-                        makeCloudEncoding("VoidCoder"),
-                        makeCloudEncoding(TestCoder.class.getName())))), Coder.class));
-  }
-
-  @Test
-  public void testCreateUntypedCoders() throws Exception {
-    Assert.assertEquals(
-        IterableCoder.of(StringUtf8Coder.of()),
-        Serializer.deserialize(
-            makeCloudEncoding(
-                "kind:stream",
-                makeCloudEncoding("StringUtf8Coder")), Coder.class));
-    Assert.assertEquals(
-        KvCoder.of(BigEndianIntegerCoder.of(), VoidCoder.of()),
-        Serializer.deserialize(
-            makeCloudEncoding(
-                "kind:pair",
-                makeCloudEncoding(BigEndianIntegerCoder.class.getName()),
-                makeCloudEncoding("VoidCoder")), Coder.class));
-    Assert.assertEquals(
-        IterableCoder.of(
-            KvCoder.of(IterableCoder.of(BigEndianIntegerCoder.of()),
-                       KvCoder.of(VoidCoder.of(),
-                                  TestCoder.of()))),
-        Serializer.deserialize(
-            makeCloudEncoding(
-                "kind:stream",
-                makeCloudEncoding(
-                    "kind:pair",
-                    makeCloudEncoding(
-                        "kind:stream",
-                        makeCloudEncoding("BigEndianIntegerCoder")),
-                    makeCloudEncoding(
-                        "kind:pair",
-                        makeCloudEncoding("VoidCoder"),
-                        makeCloudEncoding(TestCoder.class.getName())))), Coder.class));
-  }
-
-  @Test
-  public void testCreateUnknownCoder() throws Exception {
-    try {
-      Serializer.deserialize(makeCloudEncoding("UnknownCoder"), Coder.class);
-      Assert.fail("should have thrown an exception");
-    } catch (Exception exn) {
-      Assert.assertThat(exn.toString(),
-                        CoreMatchers.containsString(
-                            "Unable to convert coder ID UnknownCoder to class"));
-    }
-  }
-
-  @Test
   public void testClosingCoderFailsWhenDecodingBase64() throws Exception {
     expectedException.expect(UnsupportedOperationException.class);
     expectedException.expectMessage("Caller does not own the underlying");


[2/2] beam git commit: [BEAM-2020] Copy CloudObject to the Dataflow Module

Posted by lc...@apache.org.
[BEAM-2020] Copy CloudObject to the Dataflow Module

This closes #2808


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3afd338f
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3afd338f
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3afd338f

Branch: refs/heads/master
Commit: 3afd338fd928b4bbd43c29cd75c48e61aabb2118
Parents: 7f50ea2 79b364f
Author: Luke Cwik <lc...@google.com>
Authored: Mon May 1 18:03:28 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Mon May 1 18:03:28 2017 -0700

----------------------------------------------------------------------
 .../runners/dataflow/util/CloudKnownType.java   | 138 ++++++++++
 .../beam/runners/dataflow/util/CloudObject.java | 185 +++++++++++++
 .../beam/runners/dataflow/util/Serializer.java  | 262 +++++++++++++++++++
 .../apache/beam/sdk/util/CloudKnownType.java    |   7 +-
 .../org/apache/beam/sdk/util/CloudObject.java   |   3 +
 .../org/apache/beam/sdk/util/CoderUtils.java    |  15 +-
 .../org/apache/beam/sdk/util/Serializer.java    |   3 +
 .../apache/beam/sdk/util/CoderUtilsTest.java    | 104 --------
 8 files changed, 600 insertions(+), 117 deletions(-)
----------------------------------------------------------------------