You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2013/11/26 04:27:10 UTC
git commit: Second cut at rewriting custom Writable types to a more
compact format.
Updated Branches:
refs/heads/master 2a8b6c149 -> c51ef57ae
Second cut at rewriting custom Writable types to a more compact format.
Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/c51ef57a
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/c51ef57a
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/c51ef57a
Branch: refs/heads/master
Commit: c51ef57aeecb78901e7489728367c2921f4c08d7
Parents: 2a8b6c1
Author: Josh Wills <jw...@apache.org>
Authored: Wed Feb 27 21:34:44 2013 -0800
Committer: Josh Wills <jw...@apache.org>
Committed: Thu Nov 21 19:50:32 2013 -0800
----------------------------------------------------------------------
.../main/java/org/apache/crunch/lib/Sort.java | 7 +-
.../lib/sort/TupleWritableComparator.java | 66 ++++++----
.../types/writable/GenericArrayWritable.java | 35 ++----
.../crunch/types/writable/TextMapWritable.java | 41 ++-----
.../crunch/types/writable/TupleWritable.java | 65 +++++-----
.../apache/crunch/types/writable/Writables.java | 119 ++++++++++++-------
.../lib/TupleWritablePartitionerTest.java | 32 +----
.../writable/GenericArrayWritableTest.java | 25 ++--
.../crunch/types/writable/WritableTypeTest.java | 2 +-
.../crunch/types/writable/WritablesTest.java | 37 ++++--
10 files changed, 229 insertions(+), 200 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/crunch/blob/c51ef57a/crunch-core/src/main/java/org/apache/crunch/lib/Sort.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/Sort.java b/crunch-core/src/main/java/org/apache/crunch/lib/Sort.java
index 94ce7d8..011d9cd 100644
--- a/crunch-core/src/main/java/org/apache/crunch/lib/Sort.java
+++ b/crunch-core/src/main/java/org/apache/crunch/lib/Sort.java
@@ -40,6 +40,7 @@ import org.apache.crunch.types.PType;
import org.apache.crunch.types.PTypeFamily;
import org.apache.crunch.types.avro.AvroType;
import org.apache.crunch.types.avro.AvroTypeFamily;
+import org.apache.crunch.types.writable.WritableType;
import org.apache.crunch.types.writable.WritableTypeFamily;
import org.apache.crunch.util.PartitionUtils;
import org.apache.hadoop.conf.Configuration;
@@ -252,7 +253,11 @@ public class Sort {
if (columnOrders.length == 1 && columnOrders[0].order == Order.DESCENDING) {
builder.sortComparatorClass(ReverseWritableComparator.class);
} else {
- TupleWritableComparator.configureOrdering(conf, columnOrders);
+ WritableType[] wt = new WritableType[columnOrders.length];
+ for (int i = 0; i < wt.length; i++) {
+ wt[i] = (WritableType) keyType.getSubTypes().get(i);
+ }
+ TupleWritableComparator.configureOrdering(conf, wt, columnOrders);
builder.sortComparatorClass(TupleWritableComparator.class);
}
} else if (tf == AvroTypeFamily.getInstance()) {
http://git-wip-us.apache.org/repos/asf/crunch/blob/c51ef57a/crunch-core/src/main/java/org/apache/crunch/lib/sort/TupleWritableComparator.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/sort/TupleWritableComparator.java b/crunch-core/src/main/java/org/apache/crunch/lib/sort/TupleWritableComparator.java
index 07ee5b5..9677fc1 100644
--- a/crunch-core/src/main/java/org/apache/crunch/lib/sort/TupleWritableComparator.java
+++ b/crunch-core/src/main/java/org/apache/crunch/lib/sort/TupleWritableComparator.java
@@ -17,13 +17,21 @@
*/
package org.apache.crunch.lib.sort;
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
import java.util.Arrays;
+import java.util.List;
+import com.google.common.collect.Lists;
+import org.apache.crunch.CrunchRuntimeException;
import org.apache.crunch.lib.Sort.ColumnOrder;
import org.apache.crunch.lib.Sort.Order;
import org.apache.crunch.types.writable.TupleWritable;
+import org.apache.crunch.types.writable.WritableType;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
@@ -31,36 +39,29 @@ import org.apache.hadoop.io.WritableComparator;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.collect.Iterables;
+import org.apache.hadoop.io.WritableFactories;
public class TupleWritableComparator extends WritableComparator implements Configurable {
private static final String CRUNCH_ORDERING_PROPERTY = "crunch.ordering";
private Configuration conf;
+ Writable[] w1;
+ Writable[] w2;
private ColumnOrder[] columnOrders;
public TupleWritableComparator() {
super(TupleWritable.class, true);
}
- public static void configureOrdering(Configuration conf, Order... orders) {
- conf.set(CRUNCH_ORDERING_PROPERTY,
- Joiner.on(",").join(Iterables.transform(Arrays.asList(orders), new Function<Order, String>() {
- @Override
- public String apply(Order o) {
- return o.name();
- }
- })));
- }
-
- public static void configureOrdering(Configuration conf, ColumnOrder... columnOrders) {
- conf.set(CRUNCH_ORDERING_PROPERTY,
- Joiner.on(",").join(Iterables.transform(Arrays.asList(columnOrders), new Function<ColumnOrder, String>() {
- @Override
- public String apply(ColumnOrder o) {
- return o.column() + ";" + o.order().name();
- }
- })));
+ public static void configureOrdering(Configuration conf, WritableType[] types, ColumnOrder[] columnOrders) {
+ List<String> ordering = Lists.newArrayList();
+ for (int i = 0; i < types.length; i++) {
+ Class<?> cls = types[i].getSerializationClass();
+ String order = columnOrders[i].order().name();
+ ordering.add(cls.getCanonicalName() + ";" + order);
+ }
+ conf.set(CRUNCH_ORDERING_PROPERTY, Joiner.on(",").join(ordering));
}
@Override
@@ -83,16 +84,22 @@ public class TupleWritableComparator extends WritableComparator implements Confi
} else if (!ta.has(index) && tb.has(index)) {
return -order;
} else {
- Writable v1 = ta.get(index);
- Writable v2 = tb.get(index);
+ BytesWritable v1 = ta.get(index);
+ BytesWritable v2 = tb.get(index);
if (v1 != v2 && (v1 != null && !v1.equals(v2))) {
- if (v1 instanceof WritableComparable && v2 instanceof WritableComparable) {
- int cmp = ((WritableComparable) v1).compareTo((WritableComparable) v2);
+ try {
+ w1[index].readFields(new DataInputStream(new ByteArrayInputStream(v1.getBytes())));
+ w2[index].readFields(new DataInputStream(new ByteArrayInputStream(v2.getBytes())));
+ } catch (IOException e) {
+ throw new CrunchRuntimeException(e);
+ }
+ if (w1[index] instanceof WritableComparable && w2[index] instanceof WritableComparable) {
+ int cmp = ((WritableComparable) w1[index]).compareTo((WritableComparable) w2[index]);
if (cmp != 0) {
return order * cmp;
}
} else {
- int cmp = v1.hashCode() - v2.hashCode();
+ int cmp = w1[index].hashCode() - w2[index].hashCode();
if (cmp != 0) {
return order * cmp;
}
@@ -115,11 +122,20 @@ public class TupleWritableComparator extends WritableComparator implements Confi
String ordering = conf.get(CRUNCH_ORDERING_PROPERTY);
String[] columnOrderNames = ordering.split(",");
columnOrders = new ColumnOrder[columnOrderNames.length];
+ w1 = new Writable[columnOrderNames.length];
+ w2 = new Writable[columnOrderNames.length];
for (int i = 0; i < columnOrders.length; i++) {
String[] split = columnOrderNames[i].split(";");
- int column = Integer.parseInt(split[0]);
+ String className = split[0];
+ try {
+ Class cls = Class.forName(className);
+ w1[i] = WritableFactories.newInstance(cls);
+ w2[i] = WritableFactories.newInstance(cls);
+ } catch (Exception e) {
+ throw new CrunchRuntimeException(e);
+ }
Order order = Order.valueOf(split[1]);
- columnOrders[i] = ColumnOrder.by(column, order);
+ columnOrders[i] = ColumnOrder.by(i + 1, order);
}
}
}
http://git-wip-us.apache.org/repos/asf/crunch/blob/c51ef57a/crunch-core/src/main/java/org/apache/crunch/types/writable/GenericArrayWritable.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/writable/GenericArrayWritable.java b/crunch-core/src/main/java/org/apache/crunch/types/writable/GenericArrayWritable.java
index 8b54008..9731ff4 100644
--- a/crunch-core/src/main/java/org/apache/crunch/types/writable/GenericArrayWritable.java
+++ b/crunch-core/src/main/java/org/apache/crunch/types/writable/GenericArrayWritable.java
@@ -23,20 +23,17 @@ import java.io.IOException;
import java.util.Arrays;
import org.apache.commons.lang.builder.HashCodeBuilder;
-import org.apache.crunch.CrunchRuntimeException;
-import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableFactories;
import org.apache.hadoop.io.WritableUtils;
/**
* A {@link Writable} for marshalling/unmarshalling Collections. Note that
* element order is <em>undefined</em>!
*
- * @param <T> The value type
*/
-class GenericArrayWritable<T> implements Writable {
- private Writable[] values;
+class GenericArrayWritable implements Writable {
+ private BytesWritable[] values;
private Class<? extends Writable> valueClass;
public GenericArrayWritable(Class<? extends Writable> valueClass) {
@@ -47,43 +44,29 @@ class GenericArrayWritable<T> implements Writable {
// for deserialization
}
- public void set(Writable[] values) {
+ public void set(BytesWritable[] values) {
this.values = values;
}
- public Writable[] get() {
+ public BytesWritable[] get() {
return values;
}
public void readFields(DataInput in) throws IOException {
- values = new Writable[WritableUtils.readVInt(in)]; // construct values
+ values = new BytesWritable[WritableUtils.readVInt(in)]; // construct values
if (values.length > 0) {
int nulls = WritableUtils.readVInt(in);
if (nulls == values.length) {
return;
}
- String valueType = Text.readString(in);
- setValueType(valueType);
for (int i = 0; i < values.length - nulls; i++) {
- Writable value = WritableFactories.newInstance(valueClass);
+ BytesWritable value = new BytesWritable();
value.readFields(in); // read a value
values[i] = value; // store it in values
}
}
}
- protected void setValueType(String valueType) {
- if (valueClass == null) {
- try {
- valueClass = Class.forName(valueType).asSubclass(Writable.class);
- } catch (ClassNotFoundException e) {
- throw new CrunchRuntimeException(e);
- }
- } else if (!valueType.equals(valueClass.getName())) {
- throw new IllegalStateException("Incoming " + valueType + " is not " + valueClass);
- }
- }
-
public void write(DataOutput out) throws IOException {
WritableUtils.writeVInt(out, values.length);
if (values.length > 0) {
@@ -95,10 +78,6 @@ class GenericArrayWritable<T> implements Writable {
}
WritableUtils.writeVInt(out, nulls);
if (values.length - nulls > 0) {
- if (valueClass == null) {
- throw new IllegalStateException("Value class not set by constructor or read");
- }
- Text.writeString(out, valueClass.getName());
for (int i = 0; i < values.length; i++) {
if (values[i] != null) {
values[i].write(out);
http://git-wip-us.apache.org/repos/asf/crunch/blob/c51ef57a/crunch-core/src/main/java/org/apache/crunch/types/writable/TextMapWritable.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/writable/TextMapWritable.java b/crunch-core/src/main/java/org/apache/crunch/types/writable/TextMapWritable.java
index 1ab51df..d25bd82 100644
--- a/crunch-core/src/main/java/org/apache/crunch/types/writable/TextMapWritable.java
+++ b/crunch-core/src/main/java/org/apache/crunch/types/writable/TextMapWritable.java
@@ -23,63 +23,46 @@ import java.io.IOException;
import java.util.Map;
import java.util.Set;
+import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
import com.google.common.collect.Maps;
-class TextMapWritable<T extends Writable> implements Writable {
+class TextMapWritable implements Writable {
- private Class<T> valueClazz;
- private final Map<Text, T> instance;
+ private final Map<Text, BytesWritable> instance;
public TextMapWritable() {
this.instance = Maps.newHashMap();
}
- public TextMapWritable(Class<T> valueClazz) {
- this.valueClazz = valueClazz;
- this.instance = Maps.newHashMap();
- }
-
- public void put(Text txt, T value) {
+ public void put(Text txt, BytesWritable value) {
instance.put(txt, value);
}
- public Set<Map.Entry<Text, T>> entrySet() {
+ public Set<Map.Entry<Text, BytesWritable>> entrySet() {
return instance.entrySet();
}
@Override
public void readFields(DataInput in) throws IOException {
instance.clear();
- try {
- this.valueClazz = (Class<T>) Class.forName(Text.readString(in));
- } catch (ClassNotFoundException e) {
- throw (IOException) new IOException("Failed map init").initCause(e);
- }
int entries = WritableUtils.readVInt(in);
- try {
- for (int i = 0; i < entries; i++) {
- Text txt = new Text();
- txt.readFields(in);
- T value = valueClazz.newInstance();
- value.readFields(in);
- instance.put(txt, value);
- }
- } catch (IllegalAccessException e) {
- throw (IOException) new IOException("Failed map init").initCause(e);
- } catch (InstantiationException e) {
- throw (IOException) new IOException("Failed map init").initCause(e);
+ for (int i = 0; i < entries; i++) {
+ Text txt = new Text();
+ txt.readFields(in);
+ BytesWritable value = new BytesWritable();
+ value.readFields(in);
+ instance.put(txt, value);
}
}
@Override
public void write(DataOutput out) throws IOException {
- Text.writeString(out, valueClazz.getName());
WritableUtils.writeVInt(out, instance.size());
- for (Map.Entry<Text, T> e : instance.entrySet()) {
+ for (Map.Entry<Text, BytesWritable> e : instance.entrySet()) {
e.getKey().write(out);
e.getValue().write(out);
}
http://git-wip-us.apache.org/repos/asf/crunch/blob/c51ef57a/crunch-core/src/main/java/org/apache/crunch/types/writable/TupleWritable.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/writable/TupleWritable.java b/crunch-core/src/main/java/org/apache/crunch/types/writable/TupleWritable.java
index 1c3536b..251e4f5 100644
--- a/crunch-core/src/main/java/org/apache/crunch/types/writable/TupleWritable.java
+++ b/crunch-core/src/main/java/org/apache/crunch/types/writable/TupleWritable.java
@@ -17,14 +17,19 @@
*/
package org.apache.crunch.types.writable;
+import java.io.ByteArrayInputStream;
import java.io.DataInput;
+import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.IOException;
+import java.util.List;
import org.apache.commons.lang.builder.HashCodeBuilder;
-import org.apache.hadoop.io.Text;
+import org.apache.crunch.CrunchRuntimeException;
+import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableFactories;
import org.apache.hadoop.io.WritableUtils;
/**
@@ -35,8 +40,9 @@ import org.apache.hadoop.io.WritableUtils;
public class TupleWritable implements WritableComparable<TupleWritable> {
private long written;
- private Writable[] values;
-
+ private BytesWritable[] values;
+ private List<Class<Writable>> writableClasses;
+
/**
* Create an empty tuple with no allocated storage for writables.
*/
@@ -47,11 +53,15 @@ public class TupleWritable implements WritableComparable<TupleWritable> {
* Initialize tuple with storage; unknown whether any of them contain
* "written" values.
*/
- public TupleWritable(Writable[] vals) {
+ public TupleWritable(BytesWritable[] vals) {
written = 0L;
values = vals;
}
+ public void setWritableClasses(List<Class<Writable>> writableClasses) {
+ this.writableClasses = writableClasses;
+ }
+
/**
* Return true if tuple has an element at the position provided.
*/
@@ -62,7 +72,7 @@ public class TupleWritable implements WritableComparable<TupleWritable> {
/**
* Get ith Writable from Tuple.
*/
- public Writable get(int i) {
+ public BytesWritable get(int i) {
return values[i];
}
@@ -110,7 +120,19 @@ public class TupleWritable implements WritableComparable<TupleWritable> {
public String toString() {
StringBuffer buf = new StringBuffer("[");
for (int i = 0; i < values.length; ++i) {
- buf.append(has(i) ? values[i].toString() : "");
+ if (has(i)) {
+ if (writableClasses != null) {
+ Writable w = WritableFactories.newInstance(writableClasses.get(i));
+ try {
+ w.readFields(new DataInputStream(new ByteArrayInputStream(values[i].getBytes())));
+ } catch (IOException e) {
+ throw new CrunchRuntimeException(e);
+ }
+ buf.append(w.toString());
+ } else {
+ buf.append(values[i].toString());
+ }
+ }
buf.append(",");
}
if (values.length != 0)
@@ -131,11 +153,6 @@ public class TupleWritable implements WritableComparable<TupleWritable> {
WritableUtils.writeVLong(out, written);
for (int i = 0; i < values.length; ++i) {
if (has(i)) {
- Text.writeString(out, values[i].getClass().getName());
- }
- }
- for (int i = 0; i < values.length; ++i) {
- if (has(i)) {
values[i].write(out);
}
}
@@ -144,31 +161,15 @@ public class TupleWritable implements WritableComparable<TupleWritable> {
/**
* {@inheritDoc}
*/
- @SuppressWarnings("unchecked")
- // No static typeinfo on Tuples
public void readFields(DataInput in) throws IOException {
int card = WritableUtils.readVInt(in);
- values = new Writable[card];
+ values = new BytesWritable[card];
written = WritableUtils.readVLong(in);
- Class<? extends Writable>[] cls = new Class[card];
- try {
- for (int i = 0; i < card; ++i) {
- if (has(i)) {
- cls[i] = Class.forName(Text.readString(in)).asSubclass(Writable.class);
- }
- }
- for (int i = 0; i < card; ++i) {
- if (has(i)) {
- values[i] = cls[i].newInstance();
- values[i].readFields(in);
- }
+ for (int i = 0; i < card; ++i) {
+ if (has(i)) {
+ values[i] = new BytesWritable();
+ values[i].readFields(in);
}
- } catch (ClassNotFoundException e) {
- throw (IOException) new IOException("Failed tuple init").initCause(e);
- } catch (IllegalAccessException e) {
- throw (IOException) new IOException("Failed tuple init").initCause(e);
- } catch (InstantiationException e) {
- throw (IOException) new IOException("Failed tuple init").initCause(e);
}
}
http://git-wip-us.apache.org/repos/asf/crunch/blob/c51ef57a/crunch-core/src/main/java/org/apache/crunch/types/writable/Writables.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/writable/Writables.java b/crunch-core/src/main/java/org/apache/crunch/types/writable/Writables.java
index 78cf3ae..0273e5e 100644
--- a/crunch-core/src/main/java/org/apache/crunch/types/writable/Writables.java
+++ b/crunch-core/src/main/java/org/apache/crunch/types/writable/Writables.java
@@ -17,11 +17,15 @@
*/
package org.apache.crunch.types.writable;
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.List;
import java.util.Map;
+import org.apache.crunch.CrunchRuntimeException;
import org.apache.crunch.MapFn;
import org.apache.crunch.Pair;
import org.apache.crunch.Tuple;
@@ -44,6 +48,8 @@ import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableFactories;
+import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.mapreduce.TaskInputOutputContext;
import com.google.common.collect.ImmutableMap;
@@ -273,6 +279,16 @@ public class Writables {
return new WritableTableType((WritableType) key, (WritableType) value);
}
+ private static <W extends Writable> W create(Class<W> clazz, BytesWritable bytes) {
+ W instance = (W) WritableFactories.newInstance(clazz);
+ try {
+ instance.readFields(new DataInputStream(new ByteArrayInputStream(bytes.getBytes())));
+ } catch (IOException e) {
+ throw new CrunchRuntimeException(e);
+ }
+ return instance;
+ }
+
/**
* For mapping from {@link TupleWritable} instances to {@link Tuple}s.
*
@@ -280,14 +296,17 @@ public class Writables {
private static class TWTupleMapFn extends MapFn<TupleWritable, Tuple> {
private final TupleFactory<?> tupleFactory;
private final List<MapFn> fns;
-
+ private final List<Class<Writable>> writableClasses;
+
private transient Object[] values;
- public TWTupleMapFn(TupleFactory<?> tupleFactory, PType<?>... ptypes) {
+ public TWTupleMapFn(TupleFactory<?> tupleFactory, WritableType<?, ?>... ptypes) {
this.tupleFactory = tupleFactory;
this.fns = Lists.newArrayList();
- for (PType ptype : ptypes) {
+ this.writableClasses = Lists.newArrayList();
+ for (WritableType ptype : ptypes) {
fns.add(ptype.getInputMapFn());
+ writableClasses.add(ptype.getSerializationClass());
}
}
@@ -321,7 +340,8 @@ public class Writables {
public Tuple map(TupleWritable in) {
for (int i = 0; i < values.length; i++) {
if (in.has(i)) {
- values[i] = fns.get(i).map(in.get(i));
+ Writable w = create(writableClasses.get(i), in.get(i));
+ values[i] = fns.get(i).map(w);
} else {
values[i] = null;
}
@@ -337,14 +357,17 @@ public class Writables {
private static class TupleTWMapFn extends MapFn<Tuple, TupleWritable> {
private transient TupleWritable writable;
- private transient Writable[] values;
+ private transient BytesWritable[] values;
private final List<MapFn> fns;
-
+ private final List<Class<Writable>> writableClasses;
+
public TupleTWMapFn(PType<?>... ptypes) {
this.fns = Lists.newArrayList();
+ this.writableClasses = Lists.newArrayList();
for (PType<?> ptype : ptypes) {
fns.add(ptype.getOutputMapFn());
+ writableClasses.add(((WritableType) ptype).getSerializationClass());
}
}
@@ -364,8 +387,9 @@ public class Writables {
@Override
public void initialize() {
- this.values = new Writable[fns.size()];
+ this.values = new BytesWritable[fns.size()];
this.writable = new TupleWritable(values);
+ this.writable.setWritableClasses(writableClasses);
for (MapFn fn : fns) {
fn.initialize();
}
@@ -378,7 +402,8 @@ public class Writables {
Object value = input.get(i);
if (value != null) {
writable.setWritten(i);
- values[i] = (Writable) fns.get(i).map(value);
+ Writable w = (Writable) fns.get(i).map(value);
+ values[i] = new BytesWritable(WritableUtils.toByteArray(w));
}
}
return writable;
@@ -386,38 +411,46 @@ public class Writables {
}
public static <V1, V2> WritableType<Pair<V1, V2>, TupleWritable> pairs(PType<V1> p1, PType<V2> p2) {
- TWTupleMapFn input = new TWTupleMapFn(TupleFactory.PAIR, p1, p2);
+ TWTupleMapFn input = new TWTupleMapFn(TupleFactory.PAIR, (WritableType) p1, (WritableType) p2);
TupleTWMapFn output = new TupleTWMapFn(p1, p2);
return new WritableType(Pair.class, TupleWritable.class, input, output, p1, p2);
}
public static <V1, V2, V3> WritableType<Tuple3<V1, V2, V3>, TupleWritable> triples(PType<V1> p1, PType<V2> p2,
PType<V3> p3) {
- TWTupleMapFn input = new TWTupleMapFn(TupleFactory.TUPLE3, p1, p2, p3);
+ TWTupleMapFn input = new TWTupleMapFn(TupleFactory.TUPLE3, (WritableType) p1,
+ (WritableType) p2, (WritableType) p3);
TupleTWMapFn output = new TupleTWMapFn(p1, p2, p3);
return new WritableType(Tuple3.class, TupleWritable.class, input, output, p1, p2, p3);
}
public static <V1, V2, V3, V4> WritableType<Tuple4<V1, V2, V3, V4>, TupleWritable> quads(PType<V1> p1, PType<V2> p2,
PType<V3> p3, PType<V4> p4) {
- TWTupleMapFn input = new TWTupleMapFn(TupleFactory.TUPLE4, p1, p2, p3, p4);
+ TWTupleMapFn input = new TWTupleMapFn(TupleFactory.TUPLE4, (WritableType) p1,
+ (WritableType) p2, (WritableType) p3, (WritableType) p4);
TupleTWMapFn output = new TupleTWMapFn(p1, p2, p3, p4);
return new WritableType(Tuple4.class, TupleWritable.class, input, output, p1, p2, p3, p4);
}
public static WritableType<TupleN, TupleWritable> tuples(PType... ptypes) {
- TWTupleMapFn input = new TWTupleMapFn(TupleFactory.TUPLEN, ptypes);
+ WritableType[] wt = new WritableType[ptypes.length];
+ for (int i = 0; i < wt.length; i++) {
+ wt[i] = (WritableType) ptypes[i];
+ }
+ TWTupleMapFn input = new TWTupleMapFn(TupleFactory.TUPLEN, wt);
TupleTWMapFn output = new TupleTWMapFn(ptypes);
return new WritableType(TupleN.class, TupleWritable.class, input, output, ptypes);
}
public static <T extends Tuple> PType<T> tuples(Class<T> clazz, PType... ptypes) {
Class[] typeArgs = new Class[ptypes.length];
+ WritableType[] wt = new WritableType[ptypes.length];
for (int i = 0; i < typeArgs.length; i++) {
typeArgs[i] = ptypes[i].getTypeClass();
+ wt[i] = (WritableType) ptypes[i];
}
TupleFactory<T> factory = TupleFactory.create(clazz, typeArgs);
- TWTupleMapFn input = new TWTupleMapFn(factory, ptypes);
+ TWTupleMapFn input = new TWTupleMapFn(factory, wt);
TupleTWMapFn output = new TupleTWMapFn(ptypes);
return new WritableType(clazz, TupleWritable.class, input, output, ptypes);
}
@@ -430,9 +463,11 @@ public class Writables {
}
private static class ArrayCollectionMapFn<T> extends MapFn<GenericArrayWritable, Collection<T>> {
+ private Class<Writable> clazz;
private final MapFn<Object, T> mapFn;
-
- public ArrayCollectionMapFn(MapFn<Object, T> mapFn) {
+
+ public ArrayCollectionMapFn(Class<Writable> clazz, MapFn<Object, T> mapFn) {
+ this.clazz = clazz;
this.mapFn = mapFn;
}
@@ -454,8 +489,9 @@ public class Writables {
@Override
public Collection<T> map(GenericArrayWritable input) {
Collection<T> collection = Lists.newArrayList();
- for (Writable writable : input.get()) {
- collection.add(mapFn.map(writable));
+ for (BytesWritable raw : input.get()) {
+ Writable w = create(clazz, raw);
+ collection.add(mapFn.map(w));
}
return collection;
}
@@ -463,11 +499,9 @@ public class Writables {
private static class CollectionArrayMapFn<T> extends MapFn<Collection<T>, GenericArrayWritable> {
- private final Class<? extends Writable> clazz;
private final MapFn<T, Object> mapFn;
- public CollectionArrayMapFn(Class<? extends Writable> clazz, MapFn<T, Object> mapFn) {
- this.clazz = clazz;
+ public CollectionArrayMapFn(MapFn<T, Object> mapFn) {
this.mapFn = mapFn;
}
@@ -488,27 +522,31 @@ public class Writables {
@Override
public GenericArrayWritable map(Collection<T> input) {
- GenericArrayWritable arrayWritable = new GenericArrayWritable(clazz);
- Writable[] w = new Writable[input.size()];
+ GenericArrayWritable arrayWritable = new GenericArrayWritable();
+ BytesWritable[] w = new BytesWritable[input.size()];
int index = 0;
for (T in : input) {
- w[index++] = ((Writable) mapFn.map(in));
+ Writable v = (Writable) mapFn.map(in);
+ w[index++] = new BytesWritable(WritableUtils.toByteArray(v));
}
arrayWritable.set(w);
return arrayWritable;
}
}
- public static <T> WritableType<Collection<T>, GenericArrayWritable<T>> collections(PType<T> ptype) {
+ public static <T> WritableType<Collection<T>, GenericArrayWritable> collections(PType<T> ptype) {
WritableType<T, ?> wt = (WritableType<T, ?>) ptype;
- return new WritableType(Collection.class, GenericArrayWritable.class, new ArrayCollectionMapFn(wt.getInputMapFn()),
- new CollectionArrayMapFn(wt.getSerializationClass(), wt.getOutputMapFn()), ptype);
+ return new WritableType(Collection.class, GenericArrayWritable.class,
+ new ArrayCollectionMapFn(wt.getSerializationClass(), wt.getInputMapFn()),
+ new CollectionArrayMapFn(wt.getOutputMapFn()), ptype);
}
- private static class MapInputMapFn<T> extends MapFn<TextMapWritable<Writable>, Map<String, T>> {
+ private static class MapInputMapFn<T> extends MapFn<TextMapWritable, Map<String, T>> {
+ private final Class<Writable> clazz;
private final MapFn<Writable, T> mapFn;
- public MapInputMapFn(MapFn<Writable, T> mapFn) {
+ public MapInputMapFn(Class<Writable> clazz, MapFn<Writable, T> mapFn) {
+ this.clazz = clazz;
this.mapFn = mapFn;
}
@@ -528,22 +566,21 @@ public class Writables {
}
@Override
- public Map<String, T> map(TextMapWritable<Writable> input) {
+ public Map<String, T> map(TextMapWritable input) {
Map<String, T> out = Maps.newHashMap();
- for (Map.Entry<Text, Writable> e : input.entrySet()) {
- out.put(e.getKey().toString(), mapFn.map(e.getValue()));
+ for (Map.Entry<Text, BytesWritable> e : input.entrySet()) {
+ Writable v = create(clazz, e.getValue());
+ out.put(e.getKey().toString(), mapFn.map(v));
}
return out;
}
}
- private static class MapOutputMapFn<T> extends MapFn<Map<String, T>, TextMapWritable<Writable>> {
+ private static class MapOutputMapFn<T> extends MapFn<Map<String, T>, TextMapWritable> {
- private final Class<Writable> clazz;
private final MapFn<T, Writable> mapFn;
- public MapOutputMapFn(Class<Writable> clazz, MapFn<T, Writable> mapFn) {
- this.clazz = clazz;
+ public MapOutputMapFn(MapFn<T, Writable> mapFn) {
this.mapFn = mapFn;
}
@@ -563,10 +600,11 @@ public class Writables {
}
@Override
- public TextMapWritable<Writable> map(Map<String, T> input) {
- TextMapWritable<Writable> tmw = new TextMapWritable<Writable>(clazz);
+ public TextMapWritable map(Map<String, T> input) {
+ TextMapWritable tmw = new TextMapWritable();
for (Map.Entry<String, T> e : input.entrySet()) {
- tmw.put(new Text(e.getKey()), mapFn.map(e.getValue()));
+ Writable w = mapFn.map(e.getValue());
+ tmw.put(new Text(e.getKey()), new BytesWritable(WritableUtils.toByteArray(w)));
}
return tmw;
}
@@ -574,8 +612,9 @@ public class Writables {
public static <T> WritableType<Map<String, T>, MapWritable> maps(PType<T> ptype) {
WritableType<T, ?> wt = (WritableType<T, ?>) ptype;
- return new WritableType(Map.class, TextMapWritable.class, new MapInputMapFn(wt.getInputMapFn()),
- new MapOutputMapFn(wt.getSerializationClass(), wt.getOutputMapFn()), ptype);
+ return new WritableType(Map.class, TextMapWritable.class,
+ new MapInputMapFn(wt.getSerializationClass(), wt.getInputMapFn()),
+ new MapOutputMapFn(wt.getOutputMapFn()), ptype);
}
public static <T> PType<T> jsons(Class<T> clazz) {
http://git-wip-us.apache.org/repos/asf/crunch/blob/c51ef57a/crunch-core/src/test/java/org/apache/crunch/lib/TupleWritablePartitionerTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/lib/TupleWritablePartitionerTest.java b/crunch-core/src/test/java/org/apache/crunch/lib/TupleWritablePartitionerTest.java
index 35ccc11..e8727c3 100644
--- a/crunch-core/src/test/java/org/apache/crunch/lib/TupleWritablePartitionerTest.java
+++ b/crunch-core/src/test/java/org/apache/crunch/lib/TupleWritablePartitionerTest.java
@@ -21,9 +21,10 @@ import static org.junit.Assert.assertEquals;
import org.apache.crunch.lib.join.JoinUtils.TupleWritablePartitioner;
import org.apache.crunch.types.writable.TupleWritable;
+import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
import org.junit.Before;
import org.junit.Test;
@@ -39,30 +40,9 @@ public class TupleWritablePartitionerTest {
@Test
public void testGetPartition() {
IntWritable intWritable = new IntWritable(3);
- TupleWritable key = new TupleWritable(new Writable[] { intWritable });
- assertEquals(3, tupleWritableParitioner.getPartition(key, NullWritable.get(), 5));
- assertEquals(1, tupleWritableParitioner.getPartition(key, NullWritable.get(), 2));
+ BytesWritable bw = new BytesWritable(WritableUtils.toByteArray(intWritable));
+ TupleWritable key = new TupleWritable(new BytesWritable[] { bw });
+ assertEquals(1, tupleWritableParitioner.getPartition(key, NullWritable.get(), 5));
+ assertEquals(0, tupleWritableParitioner.getPartition(key, NullWritable.get(), 2));
}
-
- @Test
- public void testGetPartition_NegativeHashValue() {
- IntWritable intWritable = new IntWritable(-3);
- // Sanity check, if this doesn't work then the premise of this test is wrong
- assertEquals(-3, intWritable.hashCode());
-
- TupleWritable key = new TupleWritable(new Writable[] { intWritable });
- assertEquals(3, tupleWritableParitioner.getPartition(key, NullWritable.get(), 5));
- assertEquals(1, tupleWritableParitioner.getPartition(key, NullWritable.get(), 2));
- }
-
- @Test
- public void testGetPartition_IntegerMinValue() {
- IntWritable intWritable = new IntWritable(Integer.MIN_VALUE);
- // Sanity check, if this doesn't work then the premise of this test is wrong
- assertEquals(Integer.MIN_VALUE, intWritable.hashCode());
-
- TupleWritable key = new TupleWritable(new Writable[] { intWritable });
- assertEquals(0, tupleWritableParitioner.getPartition(key, NullWritable.get(), Integer.MAX_VALUE));
- }
-
}
http://git-wip-us.apache.org/repos/asf/crunch/blob/c51ef57a/crunch-core/src/test/java/org/apache/crunch/types/writable/GenericArrayWritableTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/types/writable/GenericArrayWritableTest.java b/crunch-core/src/test/java/org/apache/crunch/types/writable/GenericArrayWritableTest.java
index c807a90..c446a69 100644
--- a/crunch-core/src/test/java/org/apache/crunch/types/writable/GenericArrayWritableTest.java
+++ b/crunch-core/src/test/java/org/apache/crunch/types/writable/GenericArrayWritableTest.java
@@ -26,6 +26,7 @@ import static org.junit.Assert.assertThat;
import java.util.Arrays;
import org.apache.crunch.test.Tests;
+import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.junit.Test;
@@ -35,36 +36,38 @@ public class GenericArrayWritableTest {
@Test
public void testEmpty() {
- GenericArrayWritable<Text> src = new GenericArrayWritable<Text>(Text.class);
- src.set(new Text[0]);
+ GenericArrayWritable src = new GenericArrayWritable();
+ src.set(new BytesWritable[0]);
- GenericArrayWritable<Text> dest = Tests.roundtrip(src, new GenericArrayWritable<Text>());
+ GenericArrayWritable dest = Tests.roundtrip(src, new GenericArrayWritable());
assertThat(dest.get().length, is(0));
}
@Test
public void testNonEmpty() {
- GenericArrayWritable<Text> src = new GenericArrayWritable<Text>(Text.class);
- src.set(new Text[] { new Text("foo"), new Text("bar") });
+ GenericArrayWritable src = new GenericArrayWritable();
+ src.set(new BytesWritable[] { new BytesWritable("foo".getBytes()), new BytesWritable("bar".getBytes()) });
- GenericArrayWritable<Text> dest = Tests.roundtrip(src, new GenericArrayWritable<Text>());
+ GenericArrayWritable dest = Tests.roundtrip(src, new GenericArrayWritable());
assertThat(src.get(), not(sameInstance(dest.get())));
assertThat(dest.get().length, is(2));
- assertThat(Arrays.asList(dest.get()), hasItems((Writable) new Text("foo"), new Text("bar")));
+ assertThat(Arrays.asList(dest.get()),
+ hasItems(new BytesWritable("foo".getBytes()), new BytesWritable("bar".getBytes())));
}
@Test
public void testNulls() {
- GenericArrayWritable<Text> src = new GenericArrayWritable<Text>(Text.class);
- src.set(new Text[] { new Text("a"), null, new Text("b") });
+ GenericArrayWritable src = new GenericArrayWritable();
+ src.set(new BytesWritable[] { new BytesWritable("a".getBytes()), null, new BytesWritable("b".getBytes()) });
- GenericArrayWritable<Text> dest = Tests.roundtrip(src, new GenericArrayWritable<Text>());
+ GenericArrayWritable dest = Tests.roundtrip(src, new GenericArrayWritable());
assertThat(src.get(), not(sameInstance(dest.get())));
assertThat(dest.get().length, is(3));
- assertThat(Arrays.asList(dest.get()), hasItems((Writable) new Text("a"), new Text("b"), null));
+ assertThat(Arrays.asList(dest.get()),
+ hasItems(new BytesWritable("a".getBytes()), new BytesWritable("b".getBytes()), null));
}
}
http://git-wip-us.apache.org/repos/asf/crunch/blob/c51ef57a/crunch-core/src/test/java/org/apache/crunch/types/writable/WritableTypeTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/types/writable/WritableTypeTest.java b/crunch-core/src/test/java/org/apache/crunch/types/writable/WritableTypeTest.java
index 65e946b..19a9bfe 100644
--- a/crunch-core/src/test/java/org/apache/crunch/types/writable/WritableTypeTest.java
+++ b/crunch-core/src/test/java/org/apache/crunch/types/writable/WritableTypeTest.java
@@ -58,7 +58,7 @@ public class WritableTypeTest {
@Test
public void testGetDetachedValue_Collection() {
Collection<Text> textCollection = Lists.newArrayList(new Text("value"));
- WritableType<Collection<Text>, GenericArrayWritable<Text>> ptype = Writables
+ WritableType<Collection<Text>, GenericArrayWritable> ptype = Writables
.collections(Writables.writables(Text.class));
ptype.initialize(new Configuration());
http://git-wip-us.apache.org/repos/asf/crunch/blob/c51ef57a/crunch-core/src/test/java/org/apache/crunch/types/writable/WritablesTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/types/writable/WritablesTest.java b/crunch-core/src/test/java/org/apache/crunch/types/writable/WritablesTest.java
index 5396fba..b1f4107 100644
--- a/crunch-core/src/test/java/org/apache/crunch/types/writable/WritablesTest.java
+++ b/crunch-core/src/test/java/org/apache/crunch/types/writable/WritablesTest.java
@@ -43,6 +43,7 @@ import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
import org.junit.Test;
import com.google.common.collect.Lists;
@@ -111,15 +112,22 @@ public class WritablesTest {
String s = "abc";
Collection<String> j = Lists.newArrayList();
j.add(s);
- GenericArrayWritable<Text> w = new GenericArrayWritable<Text>(Text.class);
- w.set(new Text[] { new Text(s) });
+ GenericArrayWritable w = new GenericArrayWritable();
+ Text t = new Text(s);
+ BytesWritable bw = new BytesWritable(WritableUtils.toByteArray(t));
+ w.set(new BytesWritable[] { bw });
testInputOutputFn(Writables.collections(Writables.strings()), j, w);
}
@Test
public void testPairs() throws Exception {
Pair<String, String> j = Pair.of("a", "b");
- TupleWritable w = new TupleWritable(new Text[] { new Text("a"), new Text("b"), });
+ Text[] t = new Text[] { new Text("a"), new Text("b"), };
+ BytesWritable[] b = new BytesWritable[t.length];
+ for (int i = 0; i < t.length; i++) {
+ b[i] = new BytesWritable(WritableUtils.toByteArray(t[i]));
+ }
+ TupleWritable w = new TupleWritable(b);
w.setWritten(0);
w.setWritten(1);
testInputOutputFn(Writables.pairs(Writables.strings(), Writables.strings()), j, w);
@@ -144,7 +152,12 @@ public class WritablesTest {
@SuppressWarnings("rawtypes")
public void testTriples() throws Exception {
Tuple3 j = Tuple3.of("a", "b", "c");
- TupleWritable w = new TupleWritable(new Text[] { new Text("a"), new Text("b"), new Text("c"), });
+ Text[] t = new Text[] { new Text("a"), new Text("b"), new Text("c"), };
+ BytesWritable[] b = new BytesWritable[t.length];
+ for (int i = 0; i < t.length; i++) {
+ b[i] = new BytesWritable(WritableUtils.toByteArray(t[i]));
+ }
+ TupleWritable w = new TupleWritable(b);
w.setWritten(0);
w.setWritten(1);
w.setWritten(2);
@@ -156,7 +169,12 @@ public class WritablesTest {
@SuppressWarnings("rawtypes")
public void testQuads() throws Exception {
Tuple4 j = Tuple4.of("a", "b", "c", "d");
- TupleWritable w = new TupleWritable(new Text[] { new Text("a"), new Text("b"), new Text("c"), new Text("d"), });
+ Text[] t = new Text[] { new Text("a"), new Text("b"), new Text("c"), new Text("d"), };
+ BytesWritable[] b = new BytesWritable[t.length];
+ for (int i = 0; i < t.length; i++) {
+ b[i] = new BytesWritable(WritableUtils.toByteArray(t[i]));
+ }
+ TupleWritable w = new TupleWritable(b);
w.setWritten(0);
w.setWritten(1);
w.setWritten(2);
@@ -169,8 +187,13 @@ public class WritablesTest {
@Test
public void testTupleN() throws Exception {
TupleN j = new TupleN("a", "b", "c", "d", "e");
- TupleWritable w = new TupleWritable(new Text[] { new Text("a"), new Text("b"), new Text("c"), new Text("d"),
- new Text("e"), });
+ Text[] t = new Text[] { new Text("a"), new Text("b"), new Text("c"), new Text("d"),
+ new Text("e"), };
+ BytesWritable[] b = new BytesWritable[t.length];
+ for (int i = 0; i < t.length; i++) {
+ b[i] = new BytesWritable(WritableUtils.toByteArray(t[i]));
+ }
+ TupleWritable w = new TupleWritable(b);
w.setWritten(0);
w.setWritten(1);
w.setWritten(2);