You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2014/01/09 16:40:09 UTC

git commit: CRUNCH-239: Add a Union PType.

Updated Branches:
  refs/heads/master 64c20ad9c -> 52da56301


CRUNCH-239: Add a Union PType.


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/52da5630
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/52da5630
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/52da5630

Branch: refs/heads/master
Commit: 52da56301fac9cdc0baac0ee9b8fd421a5147baa
Parents: 64c20ad
Author: Josh Wills <jw...@apache.org>
Authored: Tue Jan 7 17:28:48 2014 -0800
Committer: Josh Wills <jw...@apache.org>
Committed: Wed Jan 8 09:03:54 2014 -0800

----------------------------------------------------------------------
 .../src/main/java/org/apache/crunch/Union.java  |  65 +++++++++
 .../java/org/apache/crunch/lib/Cogroup.java     |  45 +++---
 .../org/apache/crunch/types/PTypeFamily.java    |   3 +
 .../apache/crunch/types/UnionDeepCopier.java    |  49 +++++++
 .../crunch/types/avro/AvroTypeFamily.java       |   6 +
 .../org/apache/crunch/types/avro/Avros.java     | 138 +++++++++++++++++++
 .../crunch/types/writable/UnionWritable.java    |  72 ++++++++++
 .../types/writable/WritableTypeFamily.java      |   6 +
 .../apache/crunch/types/writable/Writables.java | 103 +++++++++++++-
 9 files changed, 459 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/52da5630/crunch-core/src/main/java/org/apache/crunch/Union.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/Union.java b/crunch-core/src/main/java/org/apache/crunch/Union.java
new file mode 100644
index 0000000..6db1657
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/Union.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+/**
+ * Allows us to represent the combination of multiple data sources that may contain different types of data
+ * as a single type with an index to indicate which of the original sources the current record was from.
+ */
+public class Union {
+
+  private final int index;
+  private final Object value;
+
+  public Union(int index, Object value) {
+    this.index = index;
+    this.value = value;
+  }
+
+  /**
+   * Returns the index of the original data source for this union type.
+   */
+  public int getIndex() {
+    return index;
+  }
+
+  /**
+   * Returns the underlying object value of the record.
+   */
+  public Object getValue() {
+    return value;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+
+    Union that = (Union) o;
+
+    if (index != that.index) return false;
+    if (value != null ? !value.equals(that.value) : that.value != null) return false;
+
+    return true;
+  }
+
+  @Override
+  public int hashCode() {
+    return 31 * index + (value != null ? value.hashCode() : 0);
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/52da5630/crunch-core/src/main/java/org/apache/crunch/lib/Cogroup.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/Cogroup.java b/crunch-core/src/main/java/org/apache/crunch/lib/Cogroup.java
index 9efcb5e..8743a29 100644
--- a/crunch-core/src/main/java/org/apache/crunch/lib/Cogroup.java
+++ b/crunch-core/src/main/java/org/apache/crunch/lib/Cogroup.java
@@ -27,6 +27,7 @@ import org.apache.crunch.Tuple;
 import org.apache.crunch.Tuple3;
 import org.apache.crunch.Tuple4;
 import org.apache.crunch.TupleN;
+import org.apache.crunch.Union;
 import org.apache.crunch.types.PType;
 import org.apache.crunch.types.PTypeFamily;
 import org.apache.crunch.types.TupleFactory;
@@ -211,20 +212,18 @@ public class Cogroup {
     for (int i = 0; i < rest.length; i++) {
       ptypes[i + 1] = rest[i].getValueType();
     }
-    PType<TupleN> itype = ptf.tuples(ptypes);
+    PType<Union> itype = ptf.unionOf(ptypes);
     
-    PTable<K, TupleN> firstInter = first.mapValues("coGroupTag1",
-        new CogroupFn(0, 1 + rest.length),
-        itype);
-    PTable<K, TupleN>[] inter = new PTable[rest.length];
+    PTable<K, Union> firstInter = first.mapValues("coGroupTag1",
+        new CogroupFn(0), itype);
+    PTable<K, Union>[] inter = new PTable[rest.length];
     for (int i = 0; i < rest.length; i++) {
       inter[i] = rest[i].mapValues("coGroupTag" + (i + 2),
-          new CogroupFn(i + 1, 1 + rest.length),
-          itype);
+          new CogroupFn(i + 1), itype);
     }
     
-    PTable<K, TupleN> union = firstInter.union(inter);
-    PGroupedTable<K, TupleN> grouped;
+    PTable<K, Union> union = firstInter.union(inter);
+    PGroupedTable<K, Union> grouped;
     if (numReducers > 0) {
       grouped = union.groupByKey(numReducers);
     } else {
@@ -236,25 +235,21 @@ public class Cogroup {
         outputType);
   }
   
-  private static class CogroupFn<T> extends MapFn<T, TupleN> {
+  private static class CogroupFn<T> extends MapFn<T, Union> {
     private final int index;
-    private final int size;
-    
-    CogroupFn(int index, int size) {
+
+    CogroupFn(int index) {
       this.index = index;
-      this.size = size;
     }
 
     @Override
-    public TupleN map(T input) {
-      Object[] v = new Object[size];
-      v[index] = input;
-      return TupleN.of(v);
+    public Union map(T input) {
+      return new Union(index, input);
     }
   }
 
   private static class PostGroupFn<T extends Tuple> extends
-      MapFn<Iterable<TupleN>, T> {
+      MapFn<Iterable<Union>, T> {
     
     private final TupleFactory factory;
     private final PType[] ptypes;
@@ -273,18 +268,14 @@ public class Cogroup {
     }
     
     @Override
-    public T map(Iterable<TupleN> input) {
+    public T map(Iterable<Union> input) {
       Collection[] collections = new Collection[ptypes.length];
       for (int i = 0; i < ptypes.length; i++) {
         collections[i] = Lists.newArrayList();
       }
-      for (TupleN t : input) {
-        for (int i = 0; i < ptypes.length; i++) {
-          if (t.get(i) != null) {
-            collections[i].add(ptypes[i].getDetachedValue(t.get(i)));
-            break;
-          }
-        }
+      for (Union t : input) {
+        int index = t.getIndex();
+        collections[index].add(ptypes[index].getDetachedValue(t.getValue()));
       }
       return (T) factory.makeTuple(collections);
     }

http://git-wip-us.apache.org/repos/asf/crunch/blob/52da5630/crunch-core/src/main/java/org/apache/crunch/types/PTypeFamily.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/PTypeFamily.java b/crunch-core/src/main/java/org/apache/crunch/types/PTypeFamily.java
index 9458f14..0ad324a 100644
--- a/crunch-core/src/main/java/org/apache/crunch/types/PTypeFamily.java
+++ b/crunch-core/src/main/java/org/apache/crunch/types/PTypeFamily.java
@@ -27,6 +27,7 @@ import org.apache.crunch.Tuple;
 import org.apache.crunch.Tuple3;
 import org.apache.crunch.Tuple4;
 import org.apache.crunch.TupleN;
+import org.apache.crunch.Union;
 
 /**
  * An abstract factory for creating {@code PType} instances that have the same
@@ -68,6 +69,8 @@ public interface PTypeFamily {
 
   <S, T> PType<T> derived(Class<T> clazz, MapFn<S, T> inputFn, MapFn<T, S> outputFn, PType<S> base);
 
+  PType<Union> unionOf(PType<?>... ptypes);
+
   <K, V> PTableType<K, V> tableOf(PType<K> key, PType<V> value);
 
   /**

http://git-wip-us.apache.org/repos/asf/crunch/blob/52da5630/crunch-core/src/main/java/org/apache/crunch/types/UnionDeepCopier.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/UnionDeepCopier.java b/crunch-core/src/main/java/org/apache/crunch/types/UnionDeepCopier.java
new file mode 100644
index 0000000..ba712e0
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/types/UnionDeepCopier.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.types;
+
+import com.google.common.collect.Lists;
+import org.apache.crunch.Union;
+import org.apache.hadoop.conf.Configuration;
+
+import java.util.List;
+
+public class UnionDeepCopier implements DeepCopier<Union> {
+  private final List<PType> elementTypes;
+
+  public UnionDeepCopier(PType... elementTypes) {
+    this.elementTypes = Lists.newArrayList(elementTypes);
+  }
+
+  @Override
+  public void initialize(Configuration conf) {
+    for (PType elementType : elementTypes) {
+      elementType.initialize(conf);
+    }
+  }
+
+  @Override
+  public Union deepCopy(Union source) {
+    if (source == null) {
+      return null;
+    }
+    int index = source.getIndex();
+    Object copy = elementTypes.get(index).getDetachedValue(source.getValue());
+    return new Union(index, copy);
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/52da5630/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroTypeFamily.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroTypeFamily.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroTypeFamily.java
index e09e173..ba8add6 100644
--- a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroTypeFamily.java
+++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroTypeFamily.java
@@ -29,6 +29,7 @@ import org.apache.crunch.Tuple;
 import org.apache.crunch.Tuple3;
 import org.apache.crunch.Tuple4;
 import org.apache.crunch.TupleN;
+import org.apache.crunch.Union;
 import org.apache.crunch.types.PGroupedTableType;
 import org.apache.crunch.types.PTableType;
 import org.apache.crunch.types.PType;
@@ -161,4 +162,9 @@ public class AvroTypeFamily implements PTypeFamily {
   public <S, T> PType<T> derived(Class<T> clazz, MapFn<S, T> inputFn, MapFn<T, S> outputFn, PType<S> base) {
     return Avros.derived(clazz, inputFn, outputFn, base);
   }
+
+  @Override
+  public PType<Union> unionOf(PType<?>... ptypes) {
+    return Avros.unionOf(ptypes);
+  }
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/52da5630/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java
index 2cf63e8..8f1dae0 100644
--- a/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java
+++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java
@@ -48,6 +48,7 @@ import org.apache.crunch.Tuple;
 import org.apache.crunch.Tuple3;
 import org.apache.crunch.Tuple4;
 import org.apache.crunch.TupleN;
+import org.apache.crunch.Union;
 import org.apache.crunch.fn.CompositeMapFn;
 import org.apache.crunch.fn.IdentityFn;
 import org.apache.crunch.types.CollectionDeepCopier;
@@ -58,6 +59,7 @@ import org.apache.crunch.types.PType;
 import org.apache.crunch.types.PTypes;
 import org.apache.crunch.types.TupleDeepCopier;
 import org.apache.crunch.types.TupleFactory;
+import org.apache.crunch.types.UnionDeepCopier;
 import org.apache.crunch.types.writable.WritableDeepCopier;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
@@ -649,6 +651,142 @@ public class Avros {
         ptypes), new TupleDeepCopier(clazz, ptypes), null, ptypes);
   }
 
+  private static class UnionRecordToTuple extends MapFn<GenericRecord, Union> {
+    private final List<MapFn> fns;
+
+    public UnionRecordToTuple(PType<?>... ptypes) {
+      this.fns = Lists.newArrayList();
+      for (PType<?> ptype : ptypes) {
+        AvroType atype = (AvroType) ptype;
+        fns.add(atype.getInputMapFn());
+      }
+    }
+
+    @Override
+    public void configure(Configuration conf) {
+      for (MapFn fn : fns) {
+        fn.configure(conf);
+      }
+    }
+
+    @Override
+    public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) {
+      for (MapFn fn : fns) {
+        fn.setContext(context);
+      }
+    }
+
+    @Override
+    public void initialize() {
+      for (MapFn fn : fns) {
+        fn.initialize();
+      }
+    }
+
+    @Override
+    public Union map(GenericRecord input) {
+      int index = (Integer) input.get(0);
+      return new Union(index, fns.get(index).map(input.get(1)));
+    }
+  }
+
+  private static class TupleToUnionRecord extends MapFn<Union, GenericRecord> {
+    private final List<MapFn> fns;
+    private final List<AvroType> avroTypes;
+    private final String jsonSchema;
+    private final boolean isReflect;
+    private transient Schema schema;
+
+    public TupleToUnionRecord(Schema schema, PType<?>... ptypes) {
+      this.fns = Lists.newArrayList();
+      this.avroTypes = Lists.newArrayList();
+      this.jsonSchema = schema.toString();
+      boolean reflectFound = false;
+      boolean specificFound = false;
+      for (PType ptype : ptypes) {
+        AvroType atype = (AvroType) ptype;
+        fns.add(atype.getOutputMapFn());
+        avroTypes.add(atype);
+        if (atype.hasReflect()) {
+          reflectFound = true;
+        }
+        if (atype.hasSpecific()) {
+          specificFound = true;
+        }
+      }
+      if (specificFound && reflectFound) {
+        checkCombiningSpecificAndReflectionSchemas();
+      }
+      this.isReflect = reflectFound;
+    }
+
+    @Override
+    public void configure(Configuration conf) {
+      for (MapFn fn : fns) {
+        fn.configure(conf);
+      }
+    }
+
+    @Override
+    public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) {
+      for (MapFn fn : fns) {
+        fn.setContext(getContext());
+      }
+    }
+
+    @Override
+    public void initialize() {
+      this.schema = new Schema.Parser().parse(jsonSchema);
+      for (MapFn fn : fns) {
+        fn.initialize();
+      }
+    }
+
+    private GenericRecord createRecord() {
+      if (isReflect) {
+        return new ReflectGenericRecord(schema);
+      } else {
+        return new GenericData.Record(schema);
+      }
+    }
+
+    @Override
+    public GenericRecord map(Union input) {
+      GenericRecord record = createRecord();
+      int index = input.getIndex();
+      record.put(0, index);
+      record.put(1, fns.get(index).map(input.getValue()));
+      return record;
+    }
+  }
+
+  public static PType<Union> unionOf(PType<?>... ptypes) {
+    List<Schema> schemas = Lists.newArrayList();
+    MessageDigest md;
+    try {
+      md = MessageDigest.getInstance("MD5");
+    } catch (NoSuchAlgorithmException e) {
+      throw new RuntimeException(e);
+    }
+    for (int i = 0; i < ptypes.length; i++) {
+      AvroType atype = (AvroType) ptypes[i];
+      Schema schema = atype.getSchema();
+      if (!schemas.contains(schema)) {
+        schemas.add(schema);
+        md.update(schema.toString().getBytes(Charsets.UTF_8));
+      }
+    }
+    List<Schema.Field> fields = Lists.newArrayList(
+        new Schema.Field("index", Schema.create(Type.INT), "", null),
+        new Schema.Field("value", Schema.createUnion(schemas), "", null));
+
+    String schemaName = "union" + Base64.encodeBase64URLSafeString(md.digest()).replace('-', 'x');
+    Schema schema = Schema.createRecord(schemaName, "", "crunch", false);
+    schema.setFields(fields);
+    return new AvroType<Union>(Union.class, schema, new UnionRecordToTuple(ptypes),
+        new TupleToUnionRecord(schema, ptypes), new UnionDeepCopier(ptypes), null, ptypes);
+  }
+
   private static Schema createTupleSchema(PType<?>... ptypes) throws RuntimeException {
     // Guarantee each tuple schema has a globally unique name
     List<Schema.Field> fields = Lists.newArrayList();

http://git-wip-us.apache.org/repos/asf/crunch/blob/52da5630/crunch-core/src/main/java/org/apache/crunch/types/writable/UnionWritable.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/writable/UnionWritable.java b/crunch-core/src/main/java/org/apache/crunch/types/writable/UnionWritable.java
new file mode 100644
index 0000000..b88632a
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/types/writable/UnionWritable.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.types.writable;
+
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableUtils;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+public class UnionWritable implements WritableComparable<UnionWritable> {
+
+  private int index;
+  private BytesWritable value;
+
+  public UnionWritable() {
+    // no-arg constructor for writables
+  }
+
+  public UnionWritable(int index, BytesWritable value) {
+    this.index = index;
+    this.value = value;
+  }
+
+  public int getIndex() {
+    return index;
+  }
+
+  public BytesWritable getValue() {
+    return value;
+  }
+
+  @Override
+  public int compareTo(UnionWritable other) {
+    if (index == other.getIndex()) {
+      return value.compareTo(other.getValue());
+    }
+    return index - other.getIndex();
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    WritableUtils.writeVInt(out, index);
+    value.write(out);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    this.index = WritableUtils.readVInt(in);
+    if (value == null) {
+      value = new BytesWritable();
+    }
+    value.readFields(in);
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/52da5630/crunch-core/src/main/java/org/apache/crunch/types/writable/WritableTypeFamily.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/writable/WritableTypeFamily.java b/crunch-core/src/main/java/org/apache/crunch/types/writable/WritableTypeFamily.java
index a94db96..5754b4d 100644
--- a/crunch-core/src/main/java/org/apache/crunch/types/writable/WritableTypeFamily.java
+++ b/crunch-core/src/main/java/org/apache/crunch/types/writable/WritableTypeFamily.java
@@ -27,6 +27,7 @@ import org.apache.crunch.Tuple;
 import org.apache.crunch.Tuple3;
 import org.apache.crunch.Tuple4;
 import org.apache.crunch.TupleN;
+import org.apache.crunch.Union;
 import org.apache.crunch.types.PGroupedTableType;
 import org.apache.crunch.types.PTableType;
 import org.apache.crunch.types.PType;
@@ -144,4 +145,9 @@ public class WritableTypeFamily implements PTypeFamily {
   public <S, T> PType<T> derived(Class<T> clazz, MapFn<S, T> inputFn, MapFn<T, S> outputFn, PType<S> base) {
     return Writables.derived(clazz, inputFn, outputFn, base);
   }
+
+  @Override
+  public PType<Union> unionOf(PType<?>... ptypes) {
+    return Writables.unionOf(ptypes);
+  }
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/52da5630/crunch-core/src/main/java/org/apache/crunch/types/writable/Writables.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/writable/Writables.java b/crunch-core/src/main/java/org/apache/crunch/types/writable/Writables.java
index 0273e5e..d8ad6ca 100644
--- a/crunch-core/src/main/java/org/apache/crunch/types/writable/Writables.java
+++ b/crunch-core/src/main/java/org/apache/crunch/types/writable/Writables.java
@@ -32,6 +32,7 @@ import org.apache.crunch.Tuple;
 import org.apache.crunch.Tuple3;
 import org.apache.crunch.Tuple4;
 import org.apache.crunch.TupleN;
+import org.apache.crunch.Union;
 import org.apache.crunch.fn.CompositeMapFn;
 import org.apache.crunch.fn.IdentityFn;
 import org.apache.crunch.types.PType;
@@ -288,7 +289,7 @@ public class Writables {
     }
     return instance;
   }
-  
+
   /**
    * For mapping from {@link TupleWritable} instances to {@link Tuple}s.
    * 
@@ -455,6 +456,106 @@ public class Writables {
     return new WritableType(clazz, TupleWritable.class, input, output, ptypes);
   }
 
+  /**
+   * For mapping from {@link TupleWritable} instances to {@link Tuple}s.
+   *
+   */
+  private static class UWInputFn extends MapFn<UnionWritable, Union> {
+    private final List<MapFn> fns;
+    private final List<Class<Writable>> writableClasses;
+
+    public UWInputFn(WritableType<?, ?>... ptypes) {
+      this.fns = Lists.newArrayList();
+      this.writableClasses = Lists.newArrayList();
+      for (WritableType ptype : ptypes) {
+        fns.add(ptype.getInputMapFn());
+        writableClasses.add(ptype.getSerializationClass());
+      }
+    }
+
+    @Override
+    public void configure(Configuration conf) {
+      for (MapFn fn : fns) {
+        fn.configure(conf);
+      }
+    }
+
+    @Override
+    public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) {
+      for (MapFn fn : fns) {
+        fn.setContext(context);
+      }
+    }
+
+    @Override
+    public void initialize() {
+      for (MapFn fn : fns) {
+        fn.initialize();
+      }
+    }
+
+    @Override
+    public Union map(UnionWritable in) {
+      int index = in.getIndex();
+      Writable w = create(writableClasses.get(index), in.getValue());
+      return new Union(index, fns.get(index).map(w));
+    }
+  }
+
+  /**
+   * For mapping from {@code Tuple}s to {@code TupleWritable}s.
+   *
+   */
+  private static class UWOutputFn extends MapFn<Union, UnionWritable> {
+
+    private final List<MapFn> fns;
+
+    public UWOutputFn(PType<?>... ptypes) {
+      this.fns = Lists.newArrayList();
+      for (PType<?> ptype : ptypes) {
+        fns.add(ptype.getOutputMapFn());
+      }
+    }
+
+    @Override
+    public void configure(Configuration conf) {
+      for (MapFn fn : fns) {
+        fn.configure(conf);
+      }
+    }
+
+    @Override
+    public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) {
+      for (MapFn fn : fns) {
+        fn.setContext(context);
+      }
+    }
+
+    @Override
+    public void initialize() {
+      for (MapFn fn : fns) {
+        fn.initialize();
+      }
+    }
+
+    @Override
+    public UnionWritable map(Union input) {
+      int index = input.getIndex();
+      Writable w = (Writable) fns.get(index).map(input.getValue());
+      return new UnionWritable(index, new BytesWritable(WritableUtils.toByteArray(w)));
+    }
+  }
+
+  public static PType<Union> unionOf(PType<?>... ptypes) {
+    WritableType[] wt = new WritableType[ptypes.length];
+    for (int i = 0; i < wt.length; i++) {
+      wt[i] = (WritableType) ptypes[i];
+    }
+    UWInputFn input= new UWInputFn(wt);
+    UWOutputFn output = new UWOutputFn(ptypes);
+    return new WritableType(Union.class, UnionWritable.class, input, output, ptypes);
+  }
+
   public static <S, T> PType<T> derived(Class<T> clazz, MapFn<S, T> inputFn, MapFn<T, S> outputFn, PType<S> base) {
     WritableType<S, ?> wt = (WritableType<S, ?>) base;
     MapFn input = new CompositeMapFn(wt.getInputMapFn(), inputFn);