You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2014/02/15 00:03:47 UTC
git commit: CRUNCH-329: Fix secondary sorts for writables by
re-introducing type info.
Repository: crunch
Updated Branches:
refs/heads/master fce2b23b8 -> 1a160b653
CRUNCH-329: Fix secondary sorts for writables by re-introducing type info.
Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/1a160b65
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/1a160b65
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/1a160b65
Branch: refs/heads/master
Commit: 1a160b653509eaa4ce0e9c42ed5919a5ac545b98
Parents: fce2b23
Author: Josh Wills <jw...@apache.org>
Authored: Wed Jan 22 19:54:45 2014 -0800
Committer: Josh Wills <jw...@apache.org>
Committed: Thu Feb 13 13:03:29 2014 -0800
----------------------------------------------------------------------
.../org/apache/crunch/lib/SecondarySortIT.java | 19 ++-
.../lib/sort/TupleWritableComparator.java | 44 +-----
.../crunch/types/writable/TupleWritable.java | 127 ++++++++-------
.../apache/crunch/types/writable/Writables.java | 158 ++++++++++++++++---
.../lib/TupleWritablePartitionerTest.java | 3 +-
.../crunch/types/writable/WritablesTest.java | 39 +----
6 files changed, 236 insertions(+), 154 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/crunch/blob/1a160b65/crunch-core/src/it/java/org/apache/crunch/lib/SecondarySortIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/lib/SecondarySortIT.java b/crunch-core/src/it/java/org/apache/crunch/lib/SecondarySortIT.java
index 242f621..7284ab1 100644
--- a/crunch-core/src/it/java/org/apache/crunch/lib/SecondarySortIT.java
+++ b/crunch-core/src/it/java/org/apache/crunch/lib/SecondarySortIT.java
@@ -17,7 +17,6 @@
*/
package org.apache.crunch.lib;
-import static org.apache.crunch.types.avro.Avros.*;
import static org.junit.Assert.assertEquals;
import java.io.Serializable;
@@ -29,6 +28,9 @@ import org.apache.crunch.Pipeline;
import org.apache.crunch.impl.mr.MRPipeline;
import org.apache.crunch.io.From;
import org.apache.crunch.test.CrunchTestSupport;
+import org.apache.crunch.types.PTypeFamily;
+import org.apache.crunch.types.avro.AvroTypeFamily;
+import org.apache.crunch.types.writable.WritableTypeFamily;
import org.junit.Test;
import com.google.common.base.Joiner;
@@ -38,7 +40,16 @@ import com.google.common.collect.ImmutableList;
public class SecondarySortIT extends CrunchTestSupport implements Serializable {
@Test
- public void testSecondarySort() throws Exception {
+ public void testSecondarySortAvros() throws Exception {
+ runSecondarySort(AvroTypeFamily.getInstance());
+ }
+
+ @Test
+ public void testSecondarySortWritables() throws Exception {
+ runSecondarySort(WritableTypeFamily.getInstance());
+ }
+
+ public void runSecondarySort(PTypeFamily ptf) throws Exception {
Pipeline p = new MRPipeline(SecondarySortIT.class, tempDir.getDefaultConfiguration());
String inputFile = tempDir.copyResourceFileName("secondary_sort_input.txt");
@@ -50,14 +61,14 @@ public class SecondarySortIT extends CrunchTestSupport implements Serializable {
return Pair.of(pieces[0],
Pair.of(Integer.valueOf(pieces[1].trim()), Integer.valueOf(pieces[2].trim())));
}
- }, tableOf(strings(), pairs(ints(), ints())));
+ }, ptf.tableOf(ptf.strings(), ptf.pairs(ptf.ints(), ptf.ints())));
Iterable<String> lines = SecondarySort.sortAndApply(in, new MapFn<Pair<String, Iterable<Pair<Integer, Integer>>>, String>() {
@Override
public String map(Pair<String, Iterable<Pair<Integer, Integer>>> input) {
Joiner j = Joiner.on(',');
return j.join(input.first(), j.join(input.second()));
}
- }, strings()).materialize();
+ }, ptf.strings()).materialize();
assertEquals(ImmutableList.of("one,[-5,10],[1,1],[2,-3]", "three,[0,-1]", "two,[1,7],[2,6],[4,5]"),
ImmutableList.copyOf(lines));
p.done();
http://git-wip-us.apache.org/repos/asf/crunch/blob/1a160b65/crunch-core/src/main/java/org/apache/crunch/lib/sort/TupleWritableComparator.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/sort/TupleWritableComparator.java b/crunch-core/src/main/java/org/apache/crunch/lib/sort/TupleWritableComparator.java
index 9677fc1..9d16821 100644
--- a/crunch-core/src/main/java/org/apache/crunch/lib/sort/TupleWritableComparator.java
+++ b/crunch-core/src/main/java/org/apache/crunch/lib/sort/TupleWritableComparator.java
@@ -17,37 +17,26 @@
*/
package org.apache.crunch.lib.sort;
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.util.Arrays;
import java.util.List;
import com.google.common.collect.Lists;
-import org.apache.crunch.CrunchRuntimeException;
import org.apache.crunch.lib.Sort.ColumnOrder;
import org.apache.crunch.lib.Sort.Order;
import org.apache.crunch.types.writable.TupleWritable;
import org.apache.crunch.types.writable.WritableType;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
-import com.google.common.base.Function;
import com.google.common.base.Joiner;
-import com.google.common.collect.Iterables;
-import org.apache.hadoop.io.WritableFactories;
public class TupleWritableComparator extends WritableComparator implements Configurable {
private static final String CRUNCH_ORDERING_PROPERTY = "crunch.ordering";
private Configuration conf;
- Writable[] w1;
- Writable[] w2;
private ColumnOrder[] columnOrders;
public TupleWritableComparator() {
@@ -57,9 +46,7 @@ public class TupleWritableComparator extends WritableComparator implements Confi
public static void configureOrdering(Configuration conf, WritableType[] types, ColumnOrder[] columnOrders) {
List<String> ordering = Lists.newArrayList();
for (int i = 0; i < types.length; i++) {
- Class<?> cls = types[i].getSerializationClass();
- String order = columnOrders[i].order().name();
- ordering.add(cls.getCanonicalName() + ";" + order);
+ ordering.add(columnOrders[i].order().name());
}
conf.set(CRUNCH_ORDERING_PROPERTY, Joiner.on(",").join(ordering));
}
@@ -84,22 +71,16 @@ public class TupleWritableComparator extends WritableComparator implements Confi
} else if (!ta.has(index) && tb.has(index)) {
return -order;
} else {
- BytesWritable v1 = ta.get(index);
- BytesWritable v2 = tb.get(index);
+ Writable v1 = ta.get(index);
+ Writable v2 = tb.get(index);
if (v1 != v2 && (v1 != null && !v1.equals(v2))) {
- try {
- w1[index].readFields(new DataInputStream(new ByteArrayInputStream(v1.getBytes())));
- w2[index].readFields(new DataInputStream(new ByteArrayInputStream(v2.getBytes())));
- } catch (IOException e) {
- throw new CrunchRuntimeException(e);
- }
- if (w1[index] instanceof WritableComparable && w2[index] instanceof WritableComparable) {
- int cmp = ((WritableComparable) w1[index]).compareTo((WritableComparable) w2[index]);
+ if (v1 instanceof WritableComparable && v2 instanceof WritableComparable) {
+ int cmp = ((WritableComparable) v1).compareTo((WritableComparable) v2);
if (cmp != 0) {
return order * cmp;
}
} else {
- int cmp = w1[index].hashCode() - w2[index].hashCode();
+ int cmp = v1.hashCode() - v2.hashCode();
if (cmp != 0) {
return order * cmp;
}
@@ -122,19 +103,8 @@ public class TupleWritableComparator extends WritableComparator implements Confi
String ordering = conf.get(CRUNCH_ORDERING_PROPERTY);
String[] columnOrderNames = ordering.split(",");
columnOrders = new ColumnOrder[columnOrderNames.length];
- w1 = new Writable[columnOrderNames.length];
- w2 = new Writable[columnOrderNames.length];
for (int i = 0; i < columnOrders.length; i++) {
- String[] split = columnOrderNames[i].split(";");
- String className = split[0];
- try {
- Class cls = Class.forName(className);
- w1[i] = WritableFactories.newInstance(cls);
- w2[i] = WritableFactories.newInstance(cls);
- } catch (Exception e) {
- throw new CrunchRuntimeException(e);
- }
- Order order = Order.valueOf(split[1]);
+ Order order = Order.valueOf(columnOrderNames[i]);
columnOrders[i] = ColumnOrder.by(i + 1, order);
}
}
http://git-wip-us.apache.org/repos/asf/crunch/blob/1a160b65/crunch-core/src/main/java/org/apache/crunch/types/writable/TupleWritable.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/writable/TupleWritable.java b/crunch-core/src/main/java/org/apache/crunch/types/writable/TupleWritable.java
index 251e4f5..1362132 100644
--- a/crunch-core/src/main/java/org/apache/crunch/types/writable/TupleWritable.java
+++ b/crunch-core/src/main/java/org/apache/crunch/types/writable/TupleWritable.java
@@ -17,16 +17,16 @@
*/
package org.apache.crunch.types.writable;
-import java.io.ByteArrayInputStream;
import java.io.DataInput;
-import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.IOException;
-import java.util.List;
+import java.util.Arrays;
+import com.google.common.base.Preconditions;
import org.apache.commons.lang.builder.HashCodeBuilder;
import org.apache.crunch.CrunchRuntimeException;
-import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableFactories;
@@ -37,42 +37,64 @@ import org.apache.hadoop.io.WritableUtils;
* added here because of its package visibility restrictions.
*
*/
-public class TupleWritable implements WritableComparable<TupleWritable> {
+public class TupleWritable extends Configured implements WritableComparable<TupleWritable> {
+
+ private int[] written;
+ private Writable[] values;
- private long written;
- private BytesWritable[] values;
- private List<Class<Writable>> writableClasses;
-
/**
* Create an empty tuple with no allocated storage for writables.
*/
public TupleWritable() {
}
+ @Override
+ public void setConf(Configuration conf) {
+ super.setConf(conf);
+ if (conf == null) return;
+
+ try {
+ Writables.reloadWritableComparableCodes(conf);
+ } catch (Exception e) {
+ throw new CrunchRuntimeException("Error reloading writable comparable codes", e);
+ }
+ }
+
+ private static int[] getCodes(Writable[] writables) {
+ int[] b = new int[writables.length];
+ for (int i = 0; i < b.length; i++) {
+ if (writables[i] != null) {
+ b[i] = getCode(writables[i].getClass());
+ }
+ }
+ return b;
+ }
+
+ public TupleWritable(Writable[] values) {
+ this(values, getCodes(values));
+ }
+
/**
* Initialize tuple with storage; unknown whether any of them contain
* "written" values.
*/
- public TupleWritable(BytesWritable[] vals) {
- written = 0L;
- values = vals;
+ public TupleWritable(Writable[] values, int[] written) {
+ Preconditions.checkArgument(values.length == written.length);
+ this.written = written;
+ this.values = values;
}
- public void setWritableClasses(List<Class<Writable>> writableClasses) {
- this.writableClasses = writableClasses;
- }
-
/**
* Return true if tuple has an element at the position provided.
*/
public boolean has(int i) {
- return 0 != ((1 << i) & written);
+ return written[i] != 0;
}
/**
* Get ith Writable from Tuple.
*/
- public BytesWritable get(int i) {
+ public Writable get(int i) {
return values[i];
}
@@ -89,13 +111,13 @@ public class TupleWritable implements WritableComparable<TupleWritable> {
public boolean equals(Object other) {
if (other instanceof TupleWritable) {
TupleWritable that = (TupleWritable) other;
- if (this.size() != that.size() || this.written != that.written) {
+ if (this.size() != that.size()) {
return false;
}
for (int i = 0; i < values.length; ++i) {
if (!has(i))
continue;
- if (!values[i].equals(that.get(i))) {
+ if (written[i] != that.written[i] || !values[i].equals(that.values[i])) {
return false;
}
}
@@ -121,17 +143,7 @@ public class TupleWritable implements WritableComparable<TupleWritable> {
StringBuffer buf = new StringBuffer("[");
for (int i = 0; i < values.length; ++i) {
if (has(i)) {
- if (writableClasses != null) {
- Writable w = WritableFactories.newInstance(writableClasses.get(i));
- try {
- w.readFields(new DataInputStream(new ByteArrayInputStream(values[i].getBytes())));
- } catch (IOException e) {
- throw new CrunchRuntimeException(e);
- }
- buf.append(w.toString());
- } else {
- buf.append(values[i].toString());
- }
+ buf.append(values[i].toString());
}
buf.append(",");
}
@@ -142,6 +154,15 @@ public class TupleWritable implements WritableComparable<TupleWritable> {
return buf.toString();
}
+ public void clear() {
+ Arrays.fill(written, (byte) 0);
+ }
+
+ public void set(int index, Writable w) {
+ written[index] = getCode(w.getClass());
+ values[index] = w;
+ }
+
/**
* Writes each Writable to <code>out</code>. TupleWritable format:
* {@code
@@ -150,9 +171,9 @@ public class TupleWritable implements WritableComparable<TupleWritable> {
*/
public void write(DataOutput out) throws IOException {
WritableUtils.writeVInt(out, values.length);
- WritableUtils.writeVLong(out, written);
for (int i = 0; i < values.length; ++i) {
- if (has(i)) {
+ WritableUtils.writeVInt(out, written[i]);
+ if (written[i] != 0) {
values[i].write(out);
}
}
@@ -163,36 +184,32 @@ public class TupleWritable implements WritableComparable<TupleWritable> {
*/
public void readFields(DataInput in) throws IOException {
int card = WritableUtils.readVInt(in);
- values = new BytesWritable[card];
- written = WritableUtils.readVLong(in);
+ values = new Writable[card];
+ written = new int[card];
for (int i = 0; i < card; ++i) {
- if (has(i)) {
- values[i] = new BytesWritable();
+ written[i] = WritableUtils.readVInt(in);
+ if (written[i] != 0) {
+ values[i] = getWritable(written[i], getConf());
values[i].readFields(in);
}
}
}
- /**
- * Record that the tuple contains an element at the position provided.
- */
- public void setWritten(int i) {
- written |= 1 << i;
- }
-
- /**
- * Record that the tuple does not contain an element at the position provided.
- */
- public void clearWritten(int i) {
- written &= -1 ^ (1 << i);
+ static int getCode(Class<? extends Writable> clazz) {
+ if (Writables.WRITABLE_CODES.inverse().containsKey(clazz)) {
+ return Writables.WRITABLE_CODES.inverse().get(clazz);
+ } else {
+ return 1; // default for BytesWritable
+ }
}
- /**
- * Clear any record of which writables have been written to, without releasing
- * storage.
- */
- public void clearWritten() {
- written = 0L;
+ static Writable getWritable(int code, Configuration conf) {
+ Class<? extends Writable> clazz = Writables.WRITABLE_CODES.get(code);
+ if (clazz != null) {
+ return WritableFactories.newInstance(clazz, conf);
+ } else {
+ throw new IllegalStateException("Unknown Writable code: " + code);
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/crunch/blob/1a160b65/crunch-core/src/main/java/org/apache/crunch/types/writable/Writables.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/writable/Writables.java b/crunch-core/src/main/java/org/apache/crunch/types/writable/Writables.java
index d8ad6ca..a121ae3 100644
--- a/crunch-core/src/main/java/org/apache/crunch/types/writable/Writables.java
+++ b/crunch-core/src/main/java/org/apache/crunch/types/writable/Writables.java
@@ -18,13 +18,23 @@
package org.apache.crunch.types.writable;
import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
import java.nio.ByteBuffer;
+import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
+import com.google.common.collect.BiMap;
+import com.google.common.collect.HashBiMap;
+import com.google.common.collect.ImmutableBiMap;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.crunch.CrunchRuntimeException;
import org.apache.crunch.MapFn;
import org.apache.crunch.Pair;
@@ -49,6 +59,7 @@ import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableFactories;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.mapreduce.TaskInputOutputContext;
@@ -63,6 +74,78 @@ import com.google.common.collect.Maps;
*
*/
public class Writables {
+
+ private static final Log LOG = LogFactory.getLog(Writables.class);
+
+ static BiMap<Integer, Class<? extends Writable>> WRITABLE_CODES = HashBiMap.create(ImmutableBiMap.<Integer, Class<? extends Writable>>builder()
+ .put(1, BytesWritable.class)
+ .put(2, Text.class)
+ .put(3, IntWritable.class)
+ .put(4, LongWritable.class)
+ .put(5, FloatWritable.class)
+ .put(6, DoubleWritable.class)
+ .put(7, BooleanWritable.class)
+ .put(8, TupleWritable.class)
+ .put(9, TextMapWritable.class)
+ .put(10, UnionWritable.class)
+ .build());
+
+ /**
+ * Registers a {@code WritableComparable} class so that it can be used for comparing the fields inside of
+ * tuple types (e.g., {@code pairs}, {@code trips}, {@code tupleN}, etc.) for use in sorts and
+ * secondary sorts.
+ *
+ * @param clazz The WritableComparable class to register
+ * @return the integer code that was assigned to serialized instances of this class
+ */
+ public static void registerComparable(Class<? extends WritableComparable> clazz) {
+ int code = clazz.hashCode();
+ if (code < 0) {
+ code = -code;
+ }
+ if (code < WRITABLE_CODES.size()) {
+ code += WRITABLE_CODES.size();
+ }
+ registerComparable(clazz, code);
+ }
+
+ /**
+ * Registers a {@code WritableComparable} class with a given integer code to use for serializing
+ * and deserializing instances of this class that are defined inside of tuple types (e.g., {@code pairs},
+ * {@code trips}, {@code tupleN}, etc.) Unregistered Writables are always serialized to bytes and
+ * cannot be used in comparisons (e.g., sorts and secondary sorts) according to their underlying types.
+ *
+ * @param clazz The class to register
+ * @param code The unique registration code for the class, which must be greater than or equal to 8
+ */
+ public static void registerComparable(Class<? extends WritableComparable> clazz, int code) {
+ if (WRITABLE_CODES.containsKey(code)) {
+ throw new IllegalArgumentException("Already have writable class assigned to code = " + code);
+ }
+ }
+
+ private static final String WRITABLE_COMPARABLE_CODES = "crunch.writable.comparable.codes";
+
+ private static void serializeWritableComparableCodes(Configuration conf) throws IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ ObjectOutputStream oos = new ObjectOutputStream(baos);
+ oos.writeObject(WRITABLE_CODES);
+ oos.close();
+ conf.set(WRITABLE_COMPARABLE_CODES, Base64.encodeBase64String(baos.toByteArray()));
+ }
+
+ static void reloadWritableComparableCodes(Configuration conf) throws Exception {
+ if (conf.get(WRITABLE_COMPARABLE_CODES) != null) {
+ ByteArrayInputStream bais = new ByteArrayInputStream(Base64.decodeBase64(conf.get(WRITABLE_COMPARABLE_CODES)));
+ ObjectInputStream ois = new ObjectInputStream(bais);
+ BiMap<Integer, Class<? extends Writable>> codes = (BiMap<Integer, Class<? extends Writable>>) ois.readObject();
+ ois.close();
+ for (Map.Entry<Integer, Class<? extends Writable>> e : codes.entrySet()) {
+ WRITABLE_CODES.put(e.getKey(), e.getValue());
+ }
+ }
+ }
+
private static final MapFn<NullWritable, Void> NULL_WRITABLE_TO_VOID = new MapFn<NullWritable, Void>() {
@Override
public Void map(NullWritable input) {
@@ -280,14 +363,19 @@ public class Writables {
return new WritableTableType((WritableType) key, (WritableType) value);
}
- private static <W extends Writable> W create(Class<W> clazz, BytesWritable bytes) {
- W instance = (W) WritableFactories.newInstance(clazz);
- try {
- instance.readFields(new DataInputStream(new ByteArrayInputStream(bytes.getBytes())));
- } catch (IOException e) {
- throw new CrunchRuntimeException(e);
+ private static <W extends Writable> W create(Class<W> clazz, Writable writable) {
+ if (clazz.equals(writable.getClass())) {
+ return (W) writable;
+ } else {
+ W instance = (W) WritableFactories.newInstance(clazz);
+ BytesWritable bytes = (BytesWritable) writable;
+ try {
+ instance.readFields(new DataInputStream(new ByteArrayInputStream(bytes.getBytes())));
+ } catch (IOException e) {
+ throw new CrunchRuntimeException(e);
+ }
+ return instance;
}
- return instance;
}
/**
@@ -307,12 +395,26 @@ public class Writables {
this.writableClasses = Lists.newArrayList();
for (WritableType ptype : ptypes) {
fns.add(ptype.getInputMapFn());
- writableClasses.add(ptype.getSerializationClass());
+ Class<Writable> clazz = ptype.getSerializationClass();
+ if (WritableComparable.class.isAssignableFrom(clazz)) {
+ if (!WRITABLE_CODES.inverse().containsKey(clazz)) {
+ LOG.warn(String.format(
+ "WritableComparable class %s in tuple type should be registered with Writables.registerComparable",
+ clazz.toString()));
+ }
+ }
+ writableClasses.add(clazz);
}
}
@Override
public void configure(Configuration conf) {
+ try {
+ serializeWritableComparableCodes(conf);
+ } catch (IOException e) {
+ throw new CrunchRuntimeException("Error serializing writable comparable codes", e);
+ }
+
for (MapFn fn : fns) {
fn.configure(conf);
}
@@ -327,9 +429,11 @@ public class Writables {
@Override
public void initialize() {
+
for (MapFn fn : fns) {
fn.initialize();
}
+
// The rest of the methods allocate new
// objects each time. However this one
// uses Tuple.tuplify which does a copy
@@ -357,23 +461,28 @@ public class Writables {
*/
private static class TupleTWMapFn extends MapFn<Tuple, TupleWritable> {
- private transient TupleWritable writable;
- private transient BytesWritable[] values;
-
private final List<MapFn> fns;
- private final List<Class<Writable>> writableClasses;
-
+
+ private transient int[] written;
+ private transient Writable[] values;
+
public TupleTWMapFn(PType<?>... ptypes) {
this.fns = Lists.newArrayList();
- this.writableClasses = Lists.newArrayList();
for (PType<?> ptype : ptypes) {
fns.add(ptype.getOutputMapFn());
- writableClasses.add(((WritableType) ptype).getSerializationClass());
}
+
+ this.written = new int[fns.size()];
+ this.values = new Writable[fns.size()];
}
@Override
public void configure(Configuration conf) {
+ try {
+ serializeWritableComparableCodes(conf);
+ } catch (IOException e) {
+ throw new CrunchRuntimeException("Error serializing writable comparable codes", e);
+ }
for (MapFn fn : fns) {
fn.configure(conf);
}
@@ -388,26 +497,31 @@ public class Writables {
@Override
public void initialize() {
- this.values = new BytesWritable[fns.size()];
- this.writable = new TupleWritable(values);
- this.writable.setWritableClasses(writableClasses);
for (MapFn fn : fns) {
fn.initialize();
}
+ this.written = new int[fns.size()];
+ this.values = new Writable[fns.size()];
}
@Override
public TupleWritable map(Tuple input) {
- writable.clearWritten();
+ Arrays.fill(written, (byte) 0);
+ Arrays.fill(values, null);
for (int i = 0; i < input.size(); i++) {
Object value = input.get(i);
if (value != null) {
- writable.setWritten(i);
Writable w = (Writable) fns.get(i).map(value);
- values[i] = new BytesWritable(WritableUtils.toByteArray(w));
+ if (WRITABLE_CODES.inverse().containsKey(w.getClass())) {
+ values[i] = w;
+ written[i] = WRITABLE_CODES.inverse().get(w.getClass());
+ } else {
+ values[i] = new BytesWritable(WritableUtils.toByteArray(w));
+ written[i] = 1; // code for BytesWritable
+ }
}
}
- return writable;
+ return new TupleWritable(values, written);
}
}
http://git-wip-us.apache.org/repos/asf/crunch/blob/1a160b65/crunch-core/src/test/java/org/apache/crunch/lib/TupleWritablePartitionerTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/lib/TupleWritablePartitionerTest.java b/crunch-core/src/test/java/org/apache/crunch/lib/TupleWritablePartitionerTest.java
index e8727c3..aee185a 100644
--- a/crunch-core/src/test/java/org/apache/crunch/lib/TupleWritablePartitionerTest.java
+++ b/crunch-core/src/test/java/org/apache/crunch/lib/TupleWritablePartitionerTest.java
@@ -24,6 +24,7 @@ import org.apache.crunch.types.writable.TupleWritable;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
import org.junit.Before;
import org.junit.Test;
@@ -41,7 +42,7 @@ public class TupleWritablePartitionerTest {
public void testGetPartition() {
IntWritable intWritable = new IntWritable(3);
BytesWritable bw = new BytesWritable(WritableUtils.toByteArray(intWritable));
- TupleWritable key = new TupleWritable(new BytesWritable[] { bw });
+ TupleWritable key = new TupleWritable(new Writable[] { bw });
assertEquals(1, tupleWritableParitioner.getPartition(key, NullWritable.get(), 5));
assertEquals(0, tupleWritableParitioner.getPartition(key, NullWritable.get(), 2));
}
http://git-wip-us.apache.org/repos/asf/crunch/blob/1a160b65/crunch-core/src/test/java/org/apache/crunch/types/writable/WritablesTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/types/writable/WritablesTest.java b/crunch-core/src/test/java/org/apache/crunch/types/writable/WritablesTest.java
index b1f4107..3a6fc18 100644
--- a/crunch-core/src/test/java/org/apache/crunch/types/writable/WritablesTest.java
+++ b/crunch-core/src/test/java/org/apache/crunch/types/writable/WritablesTest.java
@@ -19,7 +19,6 @@ package org.apache.crunch.types.writable;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertSame;
import java.io.DataInput;
@@ -123,13 +122,7 @@ public class WritablesTest {
public void testPairs() throws Exception {
Pair<String, String> j = Pair.of("a", "b");
Text[] t = new Text[] { new Text("a"), new Text("b"), };
- BytesWritable[] b = new BytesWritable[t.length];
- for (int i = 0; i < t.length; i++) {
- b[i] = new BytesWritable(WritableUtils.toByteArray(t[i]));
- }
- TupleWritable w = new TupleWritable(b);
- w.setWritten(0);
- w.setWritten(1);
+ TupleWritable w = new TupleWritable(t);
testInputOutputFn(Writables.pairs(Writables.strings(), Writables.strings()), j, w);
}
@@ -153,14 +146,7 @@ public class WritablesTest {
public void testTriples() throws Exception {
Tuple3 j = Tuple3.of("a", "b", "c");
Text[] t = new Text[] { new Text("a"), new Text("b"), new Text("c"), };
- BytesWritable[] b = new BytesWritable[t.length];
- for (int i = 0; i < t.length; i++) {
- b[i] = new BytesWritable(WritableUtils.toByteArray(t[i]));
- }
- TupleWritable w = new TupleWritable(b);
- w.setWritten(0);
- w.setWritten(1);
- w.setWritten(2);
+ TupleWritable w = new TupleWritable(t);
WritableType<?, ?> wt = Writables.triples(Writables.strings(), Writables.strings(), Writables.strings());
testInputOutputFn(wt, j, w);
}
@@ -170,15 +156,7 @@ public class WritablesTest {
public void testQuads() throws Exception {
Tuple4 j = Tuple4.of("a", "b", "c", "d");
Text[] t = new Text[] { new Text("a"), new Text("b"), new Text("c"), new Text("d"), };
- BytesWritable[] b = new BytesWritable[t.length];
- for (int i = 0; i < t.length; i++) {
- b[i] = new BytesWritable(WritableUtils.toByteArray(t[i]));
- }
- TupleWritable w = new TupleWritable(b);
- w.setWritten(0);
- w.setWritten(1);
- w.setWritten(2);
- w.setWritten(3);
+ TupleWritable w = new TupleWritable(t);
WritableType<?, ?> wt = Writables.quads(Writables.strings(), Writables.strings(), Writables.strings(),
Writables.strings());
testInputOutputFn(wt, j, w);
@@ -189,16 +167,7 @@ public class WritablesTest {
TupleN j = new TupleN("a", "b", "c", "d", "e");
Text[] t = new Text[] { new Text("a"), new Text("b"), new Text("c"), new Text("d"),
new Text("e"), };
- BytesWritable[] b = new BytesWritable[t.length];
- for (int i = 0; i < t.length; i++) {
- b[i] = new BytesWritable(WritableUtils.toByteArray(t[i]));
- }
- TupleWritable w = new TupleWritable(b);
- w.setWritten(0);
- w.setWritten(1);
- w.setWritten(2);
- w.setWritten(3);
- w.setWritten(4);
+ TupleWritable w = new TupleWritable(t);
WritableType<?, ?> wt = Writables.tuples(Writables.strings(), Writables.strings(), Writables.strings(),
Writables.strings(), Writables.strings());
testInputOutputFn(wt, j, w);