You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2014/01/09 16:40:09 UTC
git commit: CRUNCH-239: Add a Union PType.
Updated Branches:
refs/heads/master 64c20ad9c -> 52da56301
CRUNCH-239: Add a Union PType.
Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/52da5630
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/52da5630
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/52da5630
Branch: refs/heads/master
Commit: 52da56301fac9cdc0baac0ee9b8fd421a5147baa
Parents: 64c20ad
Author: Josh Wills <jw...@apache.org>
Authored: Tue Jan 7 17:28:48 2014 -0800
Committer: Josh Wills <jw...@apache.org>
Committed: Wed Jan 8 09:03:54 2014 -0800
----------------------------------------------------------------------
.../src/main/java/org/apache/crunch/Union.java | 65 +++++++++
.../java/org/apache/crunch/lib/Cogroup.java | 45 +++---
.../org/apache/crunch/types/PTypeFamily.java | 3 +
.../apache/crunch/types/UnionDeepCopier.java | 49 +++++++
.../crunch/types/avro/AvroTypeFamily.java | 6 +
.../org/apache/crunch/types/avro/Avros.java | 138 +++++++++++++++++++
.../crunch/types/writable/UnionWritable.java | 72 ++++++++++
.../types/writable/WritableTypeFamily.java | 6 +
.../apache/crunch/types/writable/Writables.java | 103 +++++++++++++-
9 files changed, 459 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/crunch/blob/52da5630/crunch-core/src/main/java/org/apache/crunch/Union.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/Union.java b/crunch-core/src/main/java/org/apache/crunch/Union.java
new file mode 100644
index 0000000..6db1657
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/Union.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+/**
+ * Allows us to represent the combination of multiple data sources that may contain different types of data
+ * as a single type with an index to indicate which of the original sources the current record was from.
+ */
+public class Union {
+
+ private final int index;
+ private final Object value;
+
+ public Union(int index, Object value) {
+ this.index = index;
+ this.value = value;
+ }
+
+ /**
+ * Returns the index of the original data source for this union type.
+ */
+ public int getIndex() {
+ return index;
+ }
+
+ /**
+ * Returns the underlying object value of the record.
+ */
+ public Object getValue() {
+ return value;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ Union that = (Union) o;
+
+ if (index != that.index) return false;
+ if (value != null ? !value.equals(that.value) : that.value != null) return false;
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ return 31 * index + (value != null ? value.hashCode() : 0);
+ }
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/52da5630/crunch-core/src/main/java/org/apache/crunch/lib/Cogroup.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/Cogroup.java b/crunch-core/src/main/java/org/apache/crunch/lib/Cogroup.java
index 9efcb5e..8743a29 100644
--- a/crunch-core/src/main/java/org/apache/crunch/lib/Cogroup.java
+++ b/crunch-core/src/main/java/org/apache/crunch/lib/Cogroup.java
@@ -27,6 +27,7 @@ import org.apache.crunch.Tuple;
import org.apache.crunch.Tuple3;
import org.apache.crunch.Tuple4;
import org.apache.crunch.TupleN;
+import org.apache.crunch.Union;
import org.apache.crunch.types.PType;
import org.apache.crunch.types.PTypeFamily;
import org.apache.crunch.types.TupleFactory;
@@ -211,20 +212,18 @@ public class Cogroup {
for (int i = 0; i < rest.length; i++) {
ptypes[i + 1] = rest[i].getValueType();
}
- PType<TupleN> itype = ptf.tuples(ptypes);
+ PType<Union> itype = ptf.unionOf(ptypes);
- PTable<K, TupleN> firstInter = first.mapValues("coGroupTag1",
- new CogroupFn(0, 1 + rest.length),
- itype);
- PTable<K, TupleN>[] inter = new PTable[rest.length];
+ PTable<K, Union> firstInter = first.mapValues("coGroupTag1",
+ new CogroupFn(0), itype);
+ PTable<K, Union>[] inter = new PTable[rest.length];
for (int i = 0; i < rest.length; i++) {
inter[i] = rest[i].mapValues("coGroupTag" + (i + 2),
- new CogroupFn(i + 1, 1 + rest.length),
- itype);
+ new CogroupFn(i + 1), itype);
}
- PTable<K, TupleN> union = firstInter.union(inter);
- PGroupedTable<K, TupleN> grouped;
+ PTable<K, Union> union = firstInter.union(inter);
+ PGroupedTable<K, Union> grouped;
if (numReducers > 0) {
grouped = union.groupByKey(numReducers);
} else {
@@ -236,25 +235,21 @@ public class Cogroup {
outputType);
}
- private static class CogroupFn<T> extends MapFn<T, TupleN> {
+ private static class CogroupFn<T> extends MapFn<T, Union> {
private final int index;
- private final int size;
-
- CogroupFn(int index, int size) {
+
+ CogroupFn(int index) {
this.index = index;
- this.size = size;
}
@Override
- public TupleN map(T input) {
- Object[] v = new Object[size];
- v[index] = input;
- return TupleN.of(v);
+ public Union map(T input) {
+ return new Union(index, input);
}
}
private static class PostGroupFn<T extends Tuple> extends
- MapFn<Iterable<TupleN>, T> {
+ MapFn<Iterable<Union>, T> {
private final TupleFactory factory;
private final PType[] ptypes;
@@ -273,18 +268,14 @@ public class Cogroup {
}
@Override
- public T map(Iterable<TupleN> input) {
+ public T map(Iterable<Union> input) {
Collection[] collections = new Collection[ptypes.length];
for (int i = 0; i < ptypes.length; i++) {
collections[i] = Lists.newArrayList();
}
- for (TupleN t : input) {
- for (int i = 0; i < ptypes.length; i++) {
- if (t.get(i) != null) {
- collections[i].add(ptypes[i].getDetachedValue(t.get(i)));
- break;
- }
- }
+ for (Union t : input) {
+ int index = t.getIndex();
+ collections[index].add(ptypes[index].getDetachedValue(t.getValue()));
}
return (T) factory.makeTuple(collections);
}
http://git-wip-us.apache.org/repos/asf/crunch/blob/52da5630/crunch-core/src/main/java/org/apache/crunch/types/PTypeFamily.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/PTypeFamily.java b/crunch-core/src/main/java/org/apache/crunch/types/PTypeFamily.java
index 9458f14..0ad324a 100644
--- a/crunch-core/src/main/java/org/apache/crunch/types/PTypeFamily.java
+++ b/crunch-core/src/main/java/org/apache/crunch/types/PTypeFamily.java
@@ -27,6 +27,7 @@ import org.apache.crunch.Tuple;
import org.apache.crunch.Tuple3;
import org.apache.crunch.Tuple4;
import org.apache.crunch.TupleN;
+import org.apache.crunch.Union;
/**
* An abstract factory for creating {@code PType} instances that have the same
@@ -68,6 +69,8 @@ public interface PTypeFamily {
<S, T> PType<T> derived(Class<T> clazz, MapFn<S, T> inputFn, MapFn<T, S> outputFn, PType<S> base);
+ PType<Union> unionOf(PType<?>... ptypes);
+
<K, V> PTableType<K, V> tableOf(PType<K> key, PType<V> value);
/**
http://git-wip-us.apache.org/repos/asf/crunch/blob/52da5630/crunch-core/src/main/java/org/apache/crunch/types/UnionDeepCopier.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/UnionDeepCopier.java b/crunch-core/src/main/java/org/apache/crunch/types/UnionDeepCopier.java
new file mode 100644
index 0000000..ba712e0
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/types/UnionDeepCopier.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.types;
+
+import com.google.common.collect.Lists;
+import org.apache.crunch.Union;
+import org.apache.hadoop.conf.Configuration;
+
+import java.util.List;
+
+public class UnionDeepCopier implements DeepCopier<Union> {
+ private final List<PType> elementTypes;
+
+ public UnionDeepCopier(PType... elementTypes) {
+ this.elementTypes = Lists.newArrayList(elementTypes);
+ }
+
+ @Override
+ public void initialize(Configuration conf) {
+ for (PType elementType : elementTypes) {
+ elementType.initialize(conf);
+ }
+ }
+
+ @Override
+ public Union deepCopy(Union source) {
+ if (source == null) {
+ return null;
+ }
+ int index = source.getIndex();
+ Object copy = elementTypes.get(index).getDetachedValue(source.getValue());
+ return new Union(index, copy);
+ }
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/52da5630/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroTypeFamily.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroTypeFamily.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroTypeFamily.java
index e09e173..ba8add6 100644
--- a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroTypeFamily.java
+++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroTypeFamily.java
@@ -29,6 +29,7 @@ import org.apache.crunch.Tuple;
import org.apache.crunch.Tuple3;
import org.apache.crunch.Tuple4;
import org.apache.crunch.TupleN;
+import org.apache.crunch.Union;
import org.apache.crunch.types.PGroupedTableType;
import org.apache.crunch.types.PTableType;
import org.apache.crunch.types.PType;
@@ -161,4 +162,9 @@ public class AvroTypeFamily implements PTypeFamily {
public <S, T> PType<T> derived(Class<T> clazz, MapFn<S, T> inputFn, MapFn<T, S> outputFn, PType<S> base) {
return Avros.derived(clazz, inputFn, outputFn, base);
}
+
+ @Override
+ public PType<Union> unionOf(PType<?>... ptypes) {
+ return Avros.unionOf(ptypes);
+ }
}
http://git-wip-us.apache.org/repos/asf/crunch/blob/52da5630/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java
index 2cf63e8..8f1dae0 100644
--- a/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java
+++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java
@@ -48,6 +48,7 @@ import org.apache.crunch.Tuple;
import org.apache.crunch.Tuple3;
import org.apache.crunch.Tuple4;
import org.apache.crunch.TupleN;
+import org.apache.crunch.Union;
import org.apache.crunch.fn.CompositeMapFn;
import org.apache.crunch.fn.IdentityFn;
import org.apache.crunch.types.CollectionDeepCopier;
@@ -58,6 +59,7 @@ import org.apache.crunch.types.PType;
import org.apache.crunch.types.PTypes;
import org.apache.crunch.types.TupleDeepCopier;
import org.apache.crunch.types.TupleFactory;
+import org.apache.crunch.types.UnionDeepCopier;
import org.apache.crunch.types.writable.WritableDeepCopier;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
@@ -649,6 +651,142 @@ public class Avros {
ptypes), new TupleDeepCopier(clazz, ptypes), null, ptypes);
}
+ private static class UnionRecordToTuple extends MapFn<GenericRecord, Union> {
+ private final List<MapFn> fns;
+
+ public UnionRecordToTuple(PType<?>... ptypes) {
+ this.fns = Lists.newArrayList();
+ for (PType<?> ptype : ptypes) {
+ AvroType atype = (AvroType) ptype;
+ fns.add(atype.getInputMapFn());
+ }
+ }
+
+ @Override
+ public void configure(Configuration conf) {
+ for (MapFn fn : fns) {
+ fn.configure(conf);
+ }
+ }
+
+ @Override
+ public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) {
+ for (MapFn fn : fns) {
+ fn.setContext(context);
+ }
+ }
+
+ @Override
+ public void initialize() {
+ for (MapFn fn : fns) {
+ fn.initialize();
+ }
+ }
+
+ @Override
+ public Union map(GenericRecord input) {
+ int index = (Integer) input.get(0);
+ return new Union(index, fns.get(index).map(input.get(1)));
+ }
+ }
+
+ private static class TupleToUnionRecord extends MapFn<Union, GenericRecord> {
+ private final List<MapFn> fns;
+ private final List<AvroType> avroTypes;
+ private final String jsonSchema;
+ private final boolean isReflect;
+ private transient Schema schema;
+
+ public TupleToUnionRecord(Schema schema, PType<?>... ptypes) {
+ this.fns = Lists.newArrayList();
+ this.avroTypes = Lists.newArrayList();
+ this.jsonSchema = schema.toString();
+ boolean reflectFound = false;
+ boolean specificFound = false;
+ for (PType ptype : ptypes) {
+ AvroType atype = (AvroType) ptype;
+ fns.add(atype.getOutputMapFn());
+ avroTypes.add(atype);
+ if (atype.hasReflect()) {
+ reflectFound = true;
+ }
+ if (atype.hasSpecific()) {
+ specificFound = true;
+ }
+ }
+ if (specificFound && reflectFound) {
+ checkCombiningSpecificAndReflectionSchemas();
+ }
+ this.isReflect = reflectFound;
+ }
+
+ @Override
+ public void configure(Configuration conf) {
+ for (MapFn fn : fns) {
+ fn.configure(conf);
+ }
+ }
+
+ @Override
+ public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) {
+ for (MapFn fn : fns) {
+ fn.setContext(getContext());
+ }
+ }
+
+ @Override
+ public void initialize() {
+ this.schema = new Schema.Parser().parse(jsonSchema);
+ for (MapFn fn : fns) {
+ fn.initialize();
+ }
+ }
+
+ private GenericRecord createRecord() {
+ if (isReflect) {
+ return new ReflectGenericRecord(schema);
+ } else {
+ return new GenericData.Record(schema);
+ }
+ }
+
+ @Override
+ public GenericRecord map(Union input) {
+ GenericRecord record = createRecord();
+ int index = input.getIndex();
+ record.put(0, index);
+ record.put(1, fns.get(index).map(input.getValue()));
+ return record;
+ }
+ }
+
+ public static PType<Union> unionOf(PType<?>... ptypes) {
+ List<Schema> schemas = Lists.newArrayList();
+ MessageDigest md;
+ try {
+ md = MessageDigest.getInstance("MD5");
+ } catch (NoSuchAlgorithmException e) {
+ throw new RuntimeException(e);
+ }
+ for (int i = 0; i < ptypes.length; i++) {
+ AvroType atype = (AvroType) ptypes[i];
+ Schema schema = atype.getSchema();
+ if (!schemas.contains(schema)) {
+ schemas.add(schema);
+ md.update(schema.toString().getBytes(Charsets.UTF_8));
+ }
+ }
+ List<Schema.Field> fields = Lists.newArrayList(
+ new Schema.Field("index", Schema.create(Type.INT), "", null),
+ new Schema.Field("value", Schema.createUnion(schemas), "", null));
+
+ String schemaName = "union" + Base64.encodeBase64URLSafeString(md.digest()).replace('-', 'x');
+ Schema schema = Schema.createRecord(schemaName, "", "crunch", false);
+ schema.setFields(fields);
+ return new AvroType<Union>(Union.class, schema, new UnionRecordToTuple(ptypes),
+ new TupleToUnionRecord(schema, ptypes), new UnionDeepCopier(ptypes), null, ptypes);
+ }
+
private static Schema createTupleSchema(PType<?>... ptypes) throws RuntimeException {
// Guarantee each tuple schema has a globally unique name
List<Schema.Field> fields = Lists.newArrayList();
http://git-wip-us.apache.org/repos/asf/crunch/blob/52da5630/crunch-core/src/main/java/org/apache/crunch/types/writable/UnionWritable.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/writable/UnionWritable.java b/crunch-core/src/main/java/org/apache/crunch/types/writable/UnionWritable.java
new file mode 100644
index 0000000..b88632a
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/types/writable/UnionWritable.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.types.writable;
+
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableUtils;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+public class UnionWritable implements WritableComparable<UnionWritable> {
+
+ private int index;
+ private BytesWritable value;
+
+ public UnionWritable() {
+ // no-arg constructor for writables
+ }
+
+ public UnionWritable(int index, BytesWritable value) {
+ this.index = index;
+ this.value = value;
+ }
+
+ public int getIndex() {
+ return index;
+ }
+
+ public BytesWritable getValue() {
+ return value;
+ }
+
+ @Override
+ public int compareTo(UnionWritable other) {
+ if (index == other.getIndex()) {
+ return value.compareTo(other.getValue());
+ }
+ return index - other.getIndex();
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ WritableUtils.writeVInt(out, index);
+ value.write(out);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ this.index = WritableUtils.readVInt(in);
+ if (value == null) {
+ value = new BytesWritable();
+ }
+ value.readFields(in);
+ }
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/52da5630/crunch-core/src/main/java/org/apache/crunch/types/writable/WritableTypeFamily.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/writable/WritableTypeFamily.java b/crunch-core/src/main/java/org/apache/crunch/types/writable/WritableTypeFamily.java
index a94db96..5754b4d 100644
--- a/crunch-core/src/main/java/org/apache/crunch/types/writable/WritableTypeFamily.java
+++ b/crunch-core/src/main/java/org/apache/crunch/types/writable/WritableTypeFamily.java
@@ -27,6 +27,7 @@ import org.apache.crunch.Tuple;
import org.apache.crunch.Tuple3;
import org.apache.crunch.Tuple4;
import org.apache.crunch.TupleN;
+import org.apache.crunch.Union;
import org.apache.crunch.types.PGroupedTableType;
import org.apache.crunch.types.PTableType;
import org.apache.crunch.types.PType;
@@ -144,4 +145,9 @@ public class WritableTypeFamily implements PTypeFamily {
public <S, T> PType<T> derived(Class<T> clazz, MapFn<S, T> inputFn, MapFn<T, S> outputFn, PType<S> base) {
return Writables.derived(clazz, inputFn, outputFn, base);
}
+
+ @Override
+ public PType<Union> unionOf(PType<?>... ptypes) {
+ return Writables.unionOf(ptypes);
+ }
}
http://git-wip-us.apache.org/repos/asf/crunch/blob/52da5630/crunch-core/src/main/java/org/apache/crunch/types/writable/Writables.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/writable/Writables.java b/crunch-core/src/main/java/org/apache/crunch/types/writable/Writables.java
index 0273e5e..d8ad6ca 100644
--- a/crunch-core/src/main/java/org/apache/crunch/types/writable/Writables.java
+++ b/crunch-core/src/main/java/org/apache/crunch/types/writable/Writables.java
@@ -32,6 +32,7 @@ import org.apache.crunch.Tuple;
import org.apache.crunch.Tuple3;
import org.apache.crunch.Tuple4;
import org.apache.crunch.TupleN;
+import org.apache.crunch.Union;
import org.apache.crunch.fn.CompositeMapFn;
import org.apache.crunch.fn.IdentityFn;
import org.apache.crunch.types.PType;
@@ -288,7 +289,7 @@ public class Writables {
}
return instance;
}
-
+
/**
* For mapping from {@link TupleWritable} instances to {@link Tuple}s.
*
@@ -455,6 +456,106 @@ public class Writables {
return new WritableType(clazz, TupleWritable.class, input, output, ptypes);
}
+ /**
+ * For mapping from {@link TupleWritable} instances to {@link Tuple}s.
+ *
+ */
+ private static class UWInputFn extends MapFn<UnionWritable, Union> {
+ private final List<MapFn> fns;
+ private final List<Class<Writable>> writableClasses;
+
+ public UWInputFn(WritableType<?, ?>... ptypes) {
+ this.fns = Lists.newArrayList();
+ this.writableClasses = Lists.newArrayList();
+ for (WritableType ptype : ptypes) {
+ fns.add(ptype.getInputMapFn());
+ writableClasses.add(ptype.getSerializationClass());
+ }
+ }
+
+ @Override
+ public void configure(Configuration conf) {
+ for (MapFn fn : fns) {
+ fn.configure(conf);
+ }
+ }
+
+ @Override
+ public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) {
+ for (MapFn fn : fns) {
+ fn.setContext(context);
+ }
+ }
+
+ @Override
+ public void initialize() {
+ for (MapFn fn : fns) {
+ fn.initialize();
+ }
+ }
+
+ @Override
+ public Union map(UnionWritable in) {
+ int index = in.getIndex();
+ Writable w = create(writableClasses.get(index), in.getValue());
+ return new Union(index, fns.get(index).map(w));
+ }
+ }
+
+ /**
+ * For mapping from {@code Tuple}s to {@code TupleWritable}s.
+ *
+ */
+ private static class UWOutputFn extends MapFn<Union, UnionWritable> {
+
+ private final List<MapFn> fns;
+
+ public UWOutputFn(PType<?>... ptypes) {
+ this.fns = Lists.newArrayList();
+ for (PType<?> ptype : ptypes) {
+ fns.add(ptype.getOutputMapFn());
+ }
+ }
+
+ @Override
+ public void configure(Configuration conf) {
+ for (MapFn fn : fns) {
+ fn.configure(conf);
+ }
+ }
+
+ @Override
+ public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) {
+ for (MapFn fn : fns) {
+ fn.setContext(context);
+ }
+ }
+
+ @Override
+ public void initialize() {
+ for (MapFn fn : fns) {
+ fn.initialize();
+ }
+ }
+
+ @Override
+ public UnionWritable map(Union input) {
+ int index = input.getIndex();
+ Writable w = (Writable) fns.get(index).map(input.getValue());
+ return new UnionWritable(index, new BytesWritable(WritableUtils.toByteArray(w)));
+ }
+ }
+
+ public static PType<Union> unionOf(PType<?>... ptypes) {
+ WritableType[] wt = new WritableType[ptypes.length];
+ for (int i = 0; i < wt.length; i++) {
+ wt[i] = (WritableType) ptypes[i];
+ }
+ UWInputFn input= new UWInputFn(wt);
+ UWOutputFn output = new UWOutputFn(ptypes);
+ return new WritableType(Union.class, UnionWritable.class, input, output, ptypes);
+ }
+
public static <S, T> PType<T> derived(Class<T> clazz, MapFn<S, T> inputFn, MapFn<T, S> outputFn, PType<S> base) {
WritableType<S, ?> wt = (WritableType<S, ?>) base;
MapFn input = new CompositeMapFn(wt.getInputMapFn(), inputFn);