You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2013/04/23 22:41:07 UTC
[05/43] CRUNCH-196: crunch -> crunch-core rename to fix build issues
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/types/avro/Avros.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/avro/Avros.java b/crunch/src/main/java/org/apache/crunch/types/avro/Avros.java
deleted file mode 100644
index fc30eaf..0000000
--- a/crunch/src/main/java/org/apache/crunch/types/avro/Avros.java
+++ /dev/null
@@ -1,709 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.types.avro;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-
-import org.apache.avro.Schema;
-import org.apache.avro.Schema.Type;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.reflect.ReflectData;
-import org.apache.avro.specific.SpecificRecord;
-import org.apache.avro.util.Utf8;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.crunch.MapFn;
-import org.apache.crunch.Pair;
-import org.apache.crunch.Tuple;
-import org.apache.crunch.Tuple3;
-import org.apache.crunch.Tuple4;
-import org.apache.crunch.TupleN;
-import org.apache.crunch.fn.CompositeMapFn;
-import org.apache.crunch.fn.IdentityFn;
-import org.apache.crunch.types.CollectionDeepCopier;
-import org.apache.crunch.types.DeepCopier;
-import org.apache.crunch.types.MapDeepCopier;
-import org.apache.crunch.types.PTableType;
-import org.apache.crunch.types.PType;
-import org.apache.crunch.types.PTypes;
-import org.apache.crunch.types.TupleDeepCopier;
-import org.apache.crunch.types.TupleFactory;
-import org.apache.crunch.types.writable.WritableDeepCopier;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapreduce.TaskInputOutputContext;
-import org.apache.hadoop.util.ReflectionUtils;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
-/**
- * Defines static methods that are analogous to the methods defined in
- * {@link AvroTypeFamily} for convenient static importing.
- *
- */
-public class Avros {
-
- /**
- * Older versions of Avro (i.e., before 1.7.0) do not support schemas that are
- * composed of a mix of specific and reflection-based schemas. This bit
- * controls whether or not we allow Crunch jobs to be created that involve
- * mixing specific and reflection-based schemas and can be overridden by the
- * client developer.
- */
- public static final boolean CAN_COMBINE_SPECIFIC_AND_REFLECT_SCHEMAS;
-
- static {
- CAN_COMBINE_SPECIFIC_AND_REFLECT_SCHEMAS = AvroCapabilities.canDecodeSpecificSchemaWithReflectDatumReader();
- }
-
- /**
- * The instance we use for generating reflected schemas. May be modified by
- * clients (e.g., Scrunch.)
- */
- public static ReflectDataFactory REFLECT_DATA_FACTORY = new ReflectDataFactory();
-
- /**
- * The name of the configuration parameter that tracks which reflection
- * factory to use.
- */
- public static final String REFLECT_DATA_FACTORY_CLASS = "crunch.reflectdatafactory";
-
- public static void configureReflectDataFactory(Configuration conf) {
- conf.setClass(REFLECT_DATA_FACTORY_CLASS, REFLECT_DATA_FACTORY.getClass(), ReflectDataFactory.class);
- }
-
- public static ReflectDataFactory getReflectDataFactory(Configuration conf) {
- return (ReflectDataFactory) ReflectionUtils.newInstance(
- conf.getClass(REFLECT_DATA_FACTORY_CLASS, ReflectDataFactory.class), conf);
- }
-
- public static void checkCombiningSpecificAndReflectionSchemas() {
- if (!CAN_COMBINE_SPECIFIC_AND_REFLECT_SCHEMAS) {
- throw new IllegalStateException("Crunch does not support running jobs that"
- + " contain a mixture of reflection-based and avro-generated data types."
- + " Please consider turning your reflection-based type into an avro-generated"
- + " type and using that generated type instead."
- + " If the version of Avro you are using is 1.7.0 or greater, you can enable"
- + " combined schemas by setting the Avros.CAN_COMBINE_SPECIFIC_AND_REFLECT_SCHEMAS" + " field to 'true'.");
- }
- }
-
- public static MapFn<CharSequence, String> UTF8_TO_STRING = new MapFn<CharSequence, String>() {
- @Override
- public String map(CharSequence input) {
- return input.toString();
- }
- };
-
- public static MapFn<String, Utf8> STRING_TO_UTF8 = new MapFn<String, Utf8>() {
- @Override
- public Utf8 map(String input) {
- return new Utf8(input);
- }
- };
-
- public static MapFn<Object, ByteBuffer> BYTES_IN = new MapFn<Object, ByteBuffer>() {
- @Override
- public ByteBuffer map(Object input) {
- if (input instanceof ByteBuffer) {
- return (ByteBuffer) input;
- }
- return ByteBuffer.wrap((byte[]) input);
- }
- };
-
- private static final AvroType<String> strings = new AvroType<String>(String.class, Schema.create(Schema.Type.STRING),
- UTF8_TO_STRING, STRING_TO_UTF8, new DeepCopier.NoOpDeepCopier<String>());
- private static final AvroType<Void> nulls = create(Void.class, Schema.Type.NULL);
- private static final AvroType<Long> longs = create(Long.class, Schema.Type.LONG);
- private static final AvroType<Integer> ints = create(Integer.class, Schema.Type.INT);
- private static final AvroType<Float> floats = create(Float.class, Schema.Type.FLOAT);
- private static final AvroType<Double> doubles = create(Double.class, Schema.Type.DOUBLE);
- private static final AvroType<Boolean> booleans = create(Boolean.class, Schema.Type.BOOLEAN);
- private static final AvroType<ByteBuffer> bytes = new AvroType<ByteBuffer>(ByteBuffer.class,
- Schema.create(Schema.Type.BYTES), BYTES_IN, IdentityFn.getInstance(), new DeepCopier.NoOpDeepCopier<ByteBuffer>());
-
- private static final Map<Class<?>, PType<?>> PRIMITIVES = ImmutableMap.<Class<?>, PType<?>> builder()
- .put(String.class, strings).put(Long.class, longs).put(Integer.class, ints).put(Float.class, floats)
- .put(Double.class, doubles).put(Boolean.class, booleans).put(ByteBuffer.class, bytes).build();
-
- private static final Map<Class<?>, AvroType<?>> EXTENSIONS = Maps.newHashMap();
-
- public static <T> void register(Class<T> clazz, AvroType<T> ptype) {
- EXTENSIONS.put(clazz, ptype);
- }
-
- public static <T> PType<T> getPrimitiveType(Class<T> clazz) {
- return (PType<T>) PRIMITIVES.get(clazz);
- }
-
- static <T> boolean isPrimitive(AvroType<T> avroType) {
- return avroType.getTypeClass().isPrimitive() || PRIMITIVES.containsKey(avroType.getTypeClass());
- }
-
- private static <T> AvroType<T> create(Class<T> clazz, Schema.Type schemaType) {
- return new AvroType<T>(clazz, Schema.create(schemaType), new DeepCopier.NoOpDeepCopier<T>());
- }
-
- public static final AvroType<Void> nulls() {
- return nulls;
- }
-
- public static final AvroType<String> strings() {
- return strings;
- }
-
- public static final AvroType<Long> longs() {
- return longs;
- }
-
- public static final AvroType<Integer> ints() {
- return ints;
- }
-
- public static final AvroType<Float> floats() {
- return floats;
- }
-
- public static final AvroType<Double> doubles() {
- return doubles;
- }
-
- public static final AvroType<Boolean> booleans() {
- return booleans;
- }
-
- public static final AvroType<ByteBuffer> bytes() {
- return bytes;
- }
-
- public static final <T> AvroType<T> records(Class<T> clazz) {
- if (EXTENSIONS.containsKey(clazz)) {
- return (AvroType<T>) EXTENSIONS.get(clazz);
- }
- return containers(clazz);
- }
-
- public static final AvroType<GenericData.Record> generics(Schema schema) {
- return new AvroType<GenericData.Record>(GenericData.Record.class, schema, new AvroDeepCopier.AvroGenericDeepCopier(
- schema));
- }
-
- public static final <T> AvroType<T> containers(Class<T> clazz) {
- if (SpecificRecord.class.isAssignableFrom(clazz)) {
- return (AvroType<T>) specifics((Class<SpecificRecord>) clazz);
- }
- return reflects(clazz);
- }
-
- public static final <T extends SpecificRecord> AvroType<T> specifics(Class<T> clazz) {
- T t = ReflectionUtils.newInstance(clazz, null);
- Schema schema = t.getSchema();
- return new AvroType<T>(clazz, schema, new AvroDeepCopier.AvroSpecificDeepCopier<T>(clazz, schema));
- }
-
- public static final <T> AvroType<T> reflects(Class<T> clazz) {
- Schema schema = REFLECT_DATA_FACTORY.getReflectData().getSchema(clazz);
- return new AvroType<T>(clazz, schema, new AvroDeepCopier.AvroReflectDeepCopier<T>(clazz, schema));
- }
-
- private static class BytesToWritableMapFn<T extends Writable> extends MapFn<Object, T> {
- private static final Log LOG = LogFactory.getLog(BytesToWritableMapFn.class);
-
- private final Class<T> writableClazz;
-
- public BytesToWritableMapFn(Class<T> writableClazz) {
- this.writableClazz = writableClazz;
- }
-
- @Override
- public T map(Object input) {
- ByteBuffer byteBuffer = BYTES_IN.map(input);
- T instance = ReflectionUtils.newInstance(writableClazz, null);
- try {
- instance.readFields(new DataInputStream(new ByteArrayInputStream(byteBuffer.array(),
- byteBuffer.arrayOffset(), byteBuffer.limit())));
- } catch (IOException e) {
- LOG.error("Exception thrown reading instance of: " + writableClazz, e);
- }
- return instance;
- }
- }
-
- private static class WritableToBytesMapFn<T extends Writable> extends MapFn<T, ByteBuffer> {
- private static final Log LOG = LogFactory.getLog(WritableToBytesMapFn.class);
-
- @Override
- public ByteBuffer map(T input) {
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- DataOutputStream das = new DataOutputStream(baos);
- try {
- input.write(das);
- } catch (IOException e) {
- LOG.error("Exception thrown converting Writable to bytes", e);
- }
- return ByteBuffer.wrap(baos.toByteArray());
- }
- }
-
- public static final <T extends Writable> AvroType<T> writables(Class<T> clazz) {
- return new AvroType<T>(clazz, Schema.create(Schema.Type.BYTES), new BytesToWritableMapFn<T>(clazz),
- new WritableToBytesMapFn<T>(), new WritableDeepCopier<T>(clazz));
- }
-
- private static class GenericDataArrayToCollection<T> extends MapFn<Object, Collection<T>> {
-
- private final MapFn<Object, T> mapFn;
-
- public GenericDataArrayToCollection(MapFn<Object, T> mapFn) {
- this.mapFn = mapFn;
- }
-
- @Override
- public void configure(Configuration conf) {
- mapFn.configure(conf);
- }
-
- @Override
- public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) {
- mapFn.setContext(context);
- }
-
- @Override
- public void initialize() {
- mapFn.initialize();
- }
-
- @Override
- public Collection<T> map(Object input) {
- Collection<T> ret = Lists.newArrayList();
- if (input instanceof Collection) {
- for (Object in : (Collection<Object>) input) {
- ret.add(mapFn.map(in));
- }
- } else {
- // Assume it is an array
- Object[] arr = (Object[]) input;
- for (Object in : arr) {
- ret.add(mapFn.map(in));
- }
- }
- return ret;
- }
- }
-
- private static class CollectionToGenericDataArray extends MapFn<Collection<?>, GenericData.Array<?>> {
-
- private final MapFn mapFn;
- private final String jsonSchema;
- private transient Schema schema;
-
- public CollectionToGenericDataArray(Schema schema, MapFn mapFn) {
- this.mapFn = mapFn;
- this.jsonSchema = schema.toString();
- }
-
- @Override
- public void configure(Configuration conf) {
- mapFn.configure(conf);
- }
-
- @Override
- public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) {
- mapFn.setContext(context);
- }
-
- @Override
- public void initialize() {
- mapFn.initialize();
- }
-
- @Override
- public GenericData.Array<?> map(Collection<?> input) {
- if (schema == null) {
- schema = new Schema.Parser().parse(jsonSchema);
- }
- GenericData.Array array = new GenericData.Array(input.size(), schema);
- for (Object in : input) {
- array.add(mapFn.map(in));
- }
- return array;
- }
- }
-
- public static final <T> AvroType<Collection<T>> collections(PType<T> ptype) {
- AvroType<T> avroType = (AvroType<T>) ptype;
- Schema collectionSchema = Schema.createArray(allowNulls(avroType.getSchema()));
- GenericDataArrayToCollection<T> input = new GenericDataArrayToCollection<T>(avroType.getInputMapFn());
- CollectionToGenericDataArray output = new CollectionToGenericDataArray(collectionSchema, avroType.getOutputMapFn());
- return new AvroType(Collection.class, collectionSchema, input, output, new CollectionDeepCopier<T>(ptype), ptype);
- }
-
- private static class AvroMapToMap<T> extends MapFn<Map<CharSequence, Object>, Map<String, T>> {
- private final MapFn<Object, T> mapFn;
-
- public AvroMapToMap(MapFn<Object, T> mapFn) {
- this.mapFn = mapFn;
- }
-
- @Override
- public void configure(Configuration conf) {
- mapFn.configure(conf);
- }
-
- @Override
- public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) {
- mapFn.setContext(context);
- }
-
- @Override
- public void initialize() {
- mapFn.initialize();
- }
-
- @Override
- public Map<String, T> map(Map<CharSequence, Object> input) {
- Map<String, T> out = Maps.newHashMap();
- for (Map.Entry<CharSequence, Object> e : input.entrySet()) {
- out.put(e.getKey().toString(), mapFn.map(e.getValue()));
- }
- return out;
- }
- }
-
- private static class MapToAvroMap<T> extends MapFn<Map<String, T>, Map<Utf8, Object>> {
- private final MapFn<T, Object> mapFn;
-
- public MapToAvroMap(MapFn<T, Object> mapFn) {
- this.mapFn = mapFn;
- }
-
- @Override
- public void configure(Configuration conf) {
- mapFn.configure(conf);
- }
-
- @Override
- public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) {
- mapFn.setContext(context);
- }
-
- @Override
- public void initialize() {
- this.mapFn.initialize();
- }
-
- @Override
- public Map<Utf8, Object> map(Map<String, T> input) {
- Map<Utf8, Object> out = Maps.newHashMap();
- for (Map.Entry<String, T> e : input.entrySet()) {
- out.put(new Utf8(e.getKey()), mapFn.map(e.getValue()));
- }
- return out;
- }
- }
-
- public static final <T> AvroType<Map<String, T>> maps(PType<T> ptype) {
- AvroType<T> avroType = (AvroType<T>) ptype;
- Schema mapSchema = Schema.createMap(allowNulls(avroType.getSchema()));
- AvroMapToMap<T> inputFn = new AvroMapToMap<T>(avroType.getInputMapFn());
- MapToAvroMap<T> outputFn = new MapToAvroMap<T>(avroType.getOutputMapFn());
- return new AvroType(Map.class, mapSchema, inputFn, outputFn, new MapDeepCopier<T>(ptype), ptype);
- }
-
- private static class GenericRecordToTuple extends MapFn<GenericRecord, Tuple> {
- private final TupleFactory<?> tupleFactory;
- private final List<MapFn> fns;
-
- private transient Object[] values;
-
- public GenericRecordToTuple(TupleFactory<?> tupleFactory, PType<?>... ptypes) {
- this.tupleFactory = tupleFactory;
- this.fns = Lists.newArrayList();
- for (PType<?> ptype : ptypes) {
- AvroType atype = (AvroType) ptype;
- fns.add(atype.getInputMapFn());
- }
- }
-
- @Override
- public void configure(Configuration conf) {
- for (MapFn fn : fns) {
- fn.configure(conf);
- }
- }
-
- @Override
- public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) {
- for (MapFn fn : fns) {
- fn.setContext(context);
- }
- }
-
- @Override
- public void initialize() {
- for (MapFn fn : fns) {
- fn.initialize();
- }
- this.values = new Object[fns.size()];
- tupleFactory.initialize();
- }
-
- @Override
- public Tuple map(GenericRecord input) {
- for (int i = 0; i < values.length; i++) {
- Object v = input.get(i);
- if (v == null) {
- values[i] = null;
- } else {
- values[i] = fns.get(i).map(v);
- }
- }
- return tupleFactory.makeTuple(values);
- }
- }
-
- private static class TupleToGenericRecord extends MapFn<Tuple, GenericRecord> {
- private final List<MapFn> fns;
- private final List<AvroType> avroTypes;
- private final String jsonSchema;
- private final boolean isReflect;
- private transient Schema schema;
-
- public TupleToGenericRecord(Schema schema, PType<?>... ptypes) {
- this.fns = Lists.newArrayList();
- this.avroTypes = Lists.newArrayList();
- this.jsonSchema = schema.toString();
- boolean reflectFound = false;
- boolean specificFound = false;
- for (PType ptype : ptypes) {
- AvroType atype = (AvroType) ptype;
- fns.add(atype.getOutputMapFn());
- avroTypes.add(atype);
- if (atype.hasReflect()) {
- reflectFound = true;
- }
- if (atype.hasSpecific()) {
- specificFound = true;
- }
- }
- if (specificFound && reflectFound) {
- checkCombiningSpecificAndReflectionSchemas();
- }
- this.isReflect = reflectFound;
- }
-
- @Override
- public void configure(Configuration conf) {
- for (MapFn fn : fns) {
- fn.configure(conf);
- }
- }
-
- @Override
- public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) {
- for (MapFn fn : fns) {
- fn.setContext(getContext());
- }
- }
-
- @Override
- public void initialize() {
- this.schema = new Schema.Parser().parse(jsonSchema);
- for (MapFn fn : fns) {
- fn.initialize();
- }
- }
-
- private GenericRecord createRecord() {
- if (isReflect) {
- return new ReflectGenericRecord(schema);
- } else {
- return new GenericData.Record(schema);
- }
- }
-
- @Override
- public GenericRecord map(Tuple input) {
- GenericRecord record = createRecord();
- for (int i = 0; i < input.size(); i++) {
- Object v = input.get(i);
- if (v == null) {
- record.put(i, null);
- } else {
- record.put(i, fns.get(i).map(v));
- }
- }
- return record;
- }
- }
-
- public static final <V1, V2> AvroType<Pair<V1, V2>> pairs(PType<V1> p1, PType<V2> p2) {
- Schema schema = createTupleSchema(p1, p2);
- GenericRecordToTuple input = new GenericRecordToTuple(TupleFactory.PAIR, p1, p2);
- TupleToGenericRecord output = new TupleToGenericRecord(schema, p1, p2);
- return new AvroType(Pair.class, schema, input, output, new TupleDeepCopier(Pair.class, p1, p2), p1, p2);
- }
-
- public static final <V1, V2, V3> AvroType<Tuple3<V1, V2, V3>> triples(PType<V1> p1, PType<V2> p2, PType<V3> p3) {
- Schema schema = createTupleSchema(p1, p2, p3);
- return new AvroType(Tuple3.class, schema, new GenericRecordToTuple(TupleFactory.TUPLE3, p1, p2, p3),
- new TupleToGenericRecord(schema, p1, p2, p3), new TupleDeepCopier(Tuple3.class, p1, p2, p3), p1, p2, p3);
- }
-
- public static final <V1, V2, V3, V4> AvroType<Tuple4<V1, V2, V3, V4>> quads(PType<V1> p1, PType<V2> p2, PType<V3> p3,
- PType<V4> p4) {
- Schema schema = createTupleSchema(p1, p2, p3, p4);
- return new AvroType(Tuple4.class, schema, new GenericRecordToTuple(TupleFactory.TUPLE4, p1, p2, p3, p4),
- new TupleToGenericRecord(schema, p1, p2, p3, p4), new TupleDeepCopier(Tuple4.class, p1, p2, p3, p4), p1, p2,
- p3, p4);
- }
-
- public static final AvroType<TupleN> tuples(PType... ptypes) {
- Schema schema = createTupleSchema(ptypes);
- return new AvroType(TupleN.class, schema, new GenericRecordToTuple(TupleFactory.TUPLEN, ptypes),
- new TupleToGenericRecord(schema, ptypes), new TupleDeepCopier(TupleN.class, ptypes), ptypes);
- }
-
- public static <T extends Tuple> AvroType<T> tuples(Class<T> clazz, PType... ptypes) {
- Schema schema = createTupleSchema(ptypes);
- Class[] typeArgs = new Class[ptypes.length];
- for (int i = 0; i < typeArgs.length; i++) {
- typeArgs[i] = ptypes[i].getTypeClass();
- }
- TupleFactory<T> factory = TupleFactory.create(clazz, typeArgs);
- return new AvroType<T>(clazz, schema, new GenericRecordToTuple(factory, ptypes), new TupleToGenericRecord(schema,
- ptypes), new TupleDeepCopier(clazz, ptypes), ptypes);
- }
-
- private static Schema createTupleSchema(PType<?>... ptypes) {
- // Guarantee each tuple schema has a globally unique name
- String tupleName = "tuple" + UUID.randomUUID().toString().replace('-', 'x');
- Schema schema = Schema.createRecord(tupleName, "", "crunch", false);
- List<Schema.Field> fields = Lists.newArrayList();
- for (int i = 0; i < ptypes.length; i++) {
- AvroType atype = (AvroType) ptypes[i];
- Schema fieldSchema = allowNulls(atype.getSchema());
- fields.add(new Schema.Field("v" + i, fieldSchema, "", null));
- }
- schema.setFields(fields);
- return schema;
- }
-
- public static final <S, T> AvroType<T> derived(Class<T> clazz, MapFn<S, T> inputFn, MapFn<T, S> outputFn,
- PType<S> base) {
- AvroType<S> abase = (AvroType<S>) base;
- return new AvroType<T>(clazz, abase.getSchema(), new CompositeMapFn(abase.getInputMapFn(), inputFn),
- new CompositeMapFn(outputFn, abase.getOutputMapFn()), new DeepCopier.NoOpDeepCopier<T>(), base.getSubTypes()
- .toArray(new PType[0]));
- }
-
- public static <T> PType<T> jsons(Class<T> clazz) {
- return PTypes.jsonString(clazz, AvroTypeFamily.getInstance());
- }
-
- public static final <K, V> AvroTableType<K, V> tableOf(PType<K> key, PType<V> value) {
- if (key instanceof PTableType) {
- PTableType ptt = (PTableType) key;
- key = Avros.pairs(ptt.getKeyType(), ptt.getValueType());
- }
- if (value instanceof PTableType) {
- PTableType ptt = (PTableType) value;
- value = Avros.pairs(ptt.getKeyType(), ptt.getValueType());
- }
- AvroType<K> avroKey = (AvroType<K>) key;
- AvroType<V> avroValue = (AvroType<V>) value;
- return new AvroTableType(avroKey, avroValue, Pair.class);
- }
-
- private static final Schema NULL_SCHEMA = Schema.create(Type.NULL);
-
- private static Schema allowNulls(Schema base) {
- if (NULL_SCHEMA.equals(base)) {
- return base;
- }
- return Schema.createUnion(ImmutableList.of(base, NULL_SCHEMA));
- }
-
- private static class ReflectGenericRecord extends GenericData.Record {
-
- public ReflectGenericRecord(Schema schema) {
- super(schema);
- }
-
- @Override
- public int hashCode() {
- return reflectAwareHashCode(this, getSchema());
- }
- }
-
- /*
- * TODO: Remove this once we no longer have to support 1.5.4.
- */
- private static int reflectAwareHashCode(Object o, Schema s) {
- if (o == null)
- return 0; // incomplete datum
- int hashCode = 1;
- switch (s.getType()) {
- case RECORD:
- for (Schema.Field f : s.getFields()) {
- if (f.order() == Schema.Field.Order.IGNORE)
- continue;
- hashCode = hashCodeAdd(hashCode, ReflectData.get().getField(o, f.name(), f.pos()), f.schema());
- }
- return hashCode;
- case ARRAY:
- Collection<?> a = (Collection<?>) o;
- Schema elementType = s.getElementType();
- for (Object e : a)
- hashCode = hashCodeAdd(hashCode, e, elementType);
- return hashCode;
- case UNION:
- return reflectAwareHashCode(o, s.getTypes().get(ReflectData.get().resolveUnion(s, o)));
- case ENUM:
- return s.getEnumOrdinal(o.toString());
- case NULL:
- return 0;
- case STRING:
- return (o instanceof Utf8 ? o : new Utf8(o.toString())).hashCode();
- default:
- return o.hashCode();
- }
- }
-
- /** Add the hash code for an object into an accumulated hash code. */
- private static int hashCodeAdd(int hashCode, Object o, Schema s) {
- return 31 * hashCode + reflectAwareHashCode(o, s);
- }
-
- private Avros() {
- }
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/types/avro/ReflectDataFactory.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/avro/ReflectDataFactory.java b/crunch/src/main/java/org/apache/crunch/types/avro/ReflectDataFactory.java
deleted file mode 100644
index e973cca..0000000
--- a/crunch/src/main/java/org/apache/crunch/types/avro/ReflectDataFactory.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.types.avro;
-
-import org.apache.avro.Schema;
-import org.apache.avro.reflect.ReflectData;
-import org.apache.avro.reflect.ReflectDatumReader;
-import org.apache.avro.reflect.ReflectDatumWriter;
-
-/**
- * A Factory class for constructing Avro reflection-related objects.
- */
-public class ReflectDataFactory {
-
- public ReflectData getReflectData() {
- return ReflectData.AllowNull.get();
- }
-
- public <T> ReflectDatumReader<T> getReader(Schema schema) {
- return new ReflectDatumReader<T>(schema);
- }
-
- public <T> ReflectDatumWriter<T> getWriter(Schema schema) {
- return new ReflectDatumWriter<T>(schema);
- }
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/types/avro/SafeAvroSerialization.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/avro/SafeAvroSerialization.java b/crunch/src/main/java/org/apache/crunch/types/avro/SafeAvroSerialization.java
deleted file mode 100644
index 8bd18b0..0000000
--- a/crunch/src/main/java/org/apache/crunch/types/avro/SafeAvroSerialization.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.types.avro;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-
-import org.apache.avro.Schema;
-import org.apache.avro.io.BinaryDecoder;
-import org.apache.avro.io.BinaryEncoder;
-import org.apache.avro.io.DatumReader;
-import org.apache.avro.io.DatumWriter;
-import org.apache.avro.io.DecoderFactory;
-import org.apache.avro.io.EncoderFactory;
-import org.apache.avro.mapred.AvroJob;
-import org.apache.avro.mapred.AvroKey;
-import org.apache.avro.mapred.AvroValue;
-import org.apache.avro.mapred.AvroWrapper;
-import org.apache.avro.mapred.Pair;
-import org.apache.avro.reflect.ReflectDatumWriter;
-import org.apache.avro.specific.SpecificDatumReader;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.io.serializer.Deserializer;
-import org.apache.hadoop.io.serializer.Serialization;
-import org.apache.hadoop.io.serializer.Serializer;
-import org.apache.hadoop.util.ReflectionUtils;
-
-/** The {@link Serialization} used by jobs configured with {@link AvroJob}. */
-class SafeAvroSerialization<T> extends Configured implements Serialization<AvroWrapper<T>> {
-
- public boolean accept(Class<?> c) {
- return AvroWrapper.class.isAssignableFrom(c);
- }
-
- /**
- * Returns the specified map output deserializer. Defaults to the final output
- * deserializer if no map output schema was specified.
- */
- public Deserializer<AvroWrapper<T>> getDeserializer(Class<AvroWrapper<T>> c) {
- boolean isKey = AvroKey.class.isAssignableFrom(c);
- Configuration conf = getConf();
- Schema schema = isKey ? Pair.getKeySchema(AvroJob.getMapOutputSchema(conf)) : Pair.getValueSchema(AvroJob
- .getMapOutputSchema(conf));
-
- DatumReader<T> datumReader = null;
- if (conf.getBoolean(AvroJob.MAP_OUTPUT_IS_REFLECT, false)) {
- ReflectDataFactory factory = (ReflectDataFactory) ReflectionUtils.newInstance(
- conf.getClass("crunch.reflectdatafactory", ReflectDataFactory.class), conf);
- datumReader = factory.getReader(schema);
- } else {
- datumReader = new SpecificDatumReader<T>(schema);
- }
- return new AvroWrapperDeserializer(datumReader, isKey);
- }
-
- private static final DecoderFactory FACTORY = DecoderFactory.get();
-
- private class AvroWrapperDeserializer implements Deserializer<AvroWrapper<T>> {
-
- private DatumReader<T> reader;
- private BinaryDecoder decoder;
- private boolean isKey;
-
- public AvroWrapperDeserializer(DatumReader<T> reader, boolean isKey) {
- this.reader = reader;
- this.isKey = isKey;
- }
-
- public void open(InputStream in) {
- this.decoder = FACTORY.directBinaryDecoder(in, decoder);
- }
-
- public AvroWrapper<T> deserialize(AvroWrapper<T> wrapper) throws IOException {
- T datum = reader.read(wrapper == null ? null : wrapper.datum(), decoder);
- if (wrapper == null) {
- wrapper = isKey ? new AvroKey<T>(datum) : new AvroValue<T>(datum);
- } else {
- wrapper.datum(datum);
- }
- return wrapper;
- }
-
- public void close() throws IOException {
- decoder.inputStream().close();
- }
- }
-
- /** Returns the specified output serializer. */
- public Serializer<AvroWrapper<T>> getSerializer(Class<AvroWrapper<T>> c) {
- // AvroWrapper used for final output, AvroKey or AvroValue for map output
- boolean isFinalOutput = c.equals(AvroWrapper.class);
- Configuration conf = getConf();
- Schema schema = isFinalOutput ? AvroJob.getOutputSchema(conf) : (AvroKey.class.isAssignableFrom(c) ? Pair
- .getKeySchema(AvroJob.getMapOutputSchema(conf)) : Pair.getValueSchema(AvroJob.getMapOutputSchema(conf)));
-
- ReflectDataFactory factory = Avros.getReflectDataFactory(conf);
- ReflectDatumWriter<T> writer = factory.getWriter(schema);
- return new AvroWrapperSerializer(writer);
- }
-
- private class AvroWrapperSerializer implements Serializer<AvroWrapper<T>> {
- private DatumWriter<T> writer;
- private OutputStream out;
- private BinaryEncoder encoder;
-
- public AvroWrapperSerializer(DatumWriter<T> writer) {
- this.writer = writer;
- }
-
- public void open(OutputStream out) {
- this.out = out;
- this.encoder = new EncoderFactory().configureBlockSize(512).binaryEncoder(out, null);
- }
-
- public void serialize(AvroWrapper<T> wrapper) throws IOException {
- writer.write(wrapper.datum(), encoder);
- // would be a lot faster if the Serializer interface had a flush()
- // method and the Hadoop framework called it when needed rather
- // than for every record.
- encoder.flush();
- }
-
- public void close() throws IOException {
- out.close();
- }
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/types/avro/package-info.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/avro/package-info.java b/crunch/src/main/java/org/apache/crunch/types/avro/package-info.java
deleted file mode 100644
index abaf60f..0000000
--- a/crunch/src/main/java/org/apache/crunch/types/avro/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Business object serialization using Apache Avro.
- */
-package org.apache.crunch.types.avro;
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/types/package-info.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/package-info.java b/crunch/src/main/java/org/apache/crunch/types/package-info.java
deleted file mode 100644
index b420b03..0000000
--- a/crunch/src/main/java/org/apache/crunch/types/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Common functionality for business object serialization.
- */
-package org.apache.crunch.types;
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/types/writable/GenericArrayWritable.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/writable/GenericArrayWritable.java b/crunch/src/main/java/org/apache/crunch/types/writable/GenericArrayWritable.java
deleted file mode 100644
index 8b54008..0000000
--- a/crunch/src/main/java/org/apache/crunch/types/writable/GenericArrayWritable.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.types.writable;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.Arrays;
-
-import org.apache.commons.lang.builder.HashCodeBuilder;
-import org.apache.crunch.CrunchRuntimeException;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableFactories;
-import org.apache.hadoop.io.WritableUtils;
-
-/**
- * A {@link Writable} for marshalling/unmarshalling Collections. Note that
- * element order is <em>undefined</em>!
- *
- * @param <T> The value type
- */
-class GenericArrayWritable<T> implements Writable {
- private Writable[] values;
- private Class<? extends Writable> valueClass;
-
- public GenericArrayWritable(Class<? extends Writable> valueClass) {
- this.valueClass = valueClass;
- }
-
- public GenericArrayWritable() {
- // for deserialization
- }
-
- public void set(Writable[] values) {
- this.values = values;
- }
-
- public Writable[] get() {
- return values;
- }
-
- public void readFields(DataInput in) throws IOException {
- values = new Writable[WritableUtils.readVInt(in)]; // construct values
- if (values.length > 0) {
- int nulls = WritableUtils.readVInt(in);
- if (nulls == values.length) {
- return;
- }
- String valueType = Text.readString(in);
- setValueType(valueType);
- for (int i = 0; i < values.length - nulls; i++) {
- Writable value = WritableFactories.newInstance(valueClass);
- value.readFields(in); // read a value
- values[i] = value; // store it in values
- }
- }
- }
-
- protected void setValueType(String valueType) {
- if (valueClass == null) {
- try {
- valueClass = Class.forName(valueType).asSubclass(Writable.class);
- } catch (ClassNotFoundException e) {
- throw new CrunchRuntimeException(e);
- }
- } else if (!valueType.equals(valueClass.getName())) {
- throw new IllegalStateException("Incoming " + valueType + " is not " + valueClass);
- }
- }
-
- public void write(DataOutput out) throws IOException {
- WritableUtils.writeVInt(out, values.length);
- if (values.length > 0) {
- int nulls = 0;
- for (int i = 0; i < values.length; i++) {
- if (values[i] == null) {
- nulls++;
- }
- }
- WritableUtils.writeVInt(out, nulls);
- if (values.length - nulls > 0) {
- if (valueClass == null) {
- throw new IllegalStateException("Value class not set by constructor or read");
- }
- Text.writeString(out, valueClass.getName());
- for (int i = 0; i < values.length; i++) {
- if (values[i] != null) {
- values[i].write(out);
- }
- }
- }
- }
- }
-
- @Override
- public int hashCode() {
- HashCodeBuilder hcb = new HashCodeBuilder();
- return hcb.append(values).toHashCode();
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj)
- return true;
- if (obj == null)
- return false;
- if (getClass() != obj.getClass())
- return false;
- GenericArrayWritable other = (GenericArrayWritable) obj;
- if (!Arrays.equals(values, other.values))
- return false;
- return true;
- }
-
- @Override
- public String toString() {
- return Arrays.toString(values);
- }
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/types/writable/TextMapWritable.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/writable/TextMapWritable.java b/crunch/src/main/java/org/apache/crunch/types/writable/TextMapWritable.java
deleted file mode 100644
index 1ab51df..0000000
--- a/crunch/src/main/java/org/apache/crunch/types/writable/TextMapWritable.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.types.writable;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableUtils;
-
-import com.google.common.collect.Maps;
-
-class TextMapWritable<T extends Writable> implements Writable {
-
- private Class<T> valueClazz;
- private final Map<Text, T> instance;
-
- public TextMapWritable() {
- this.instance = Maps.newHashMap();
- }
-
- public TextMapWritable(Class<T> valueClazz) {
- this.valueClazz = valueClazz;
- this.instance = Maps.newHashMap();
- }
-
- public void put(Text txt, T value) {
- instance.put(txt, value);
- }
-
- public Set<Map.Entry<Text, T>> entrySet() {
- return instance.entrySet();
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- instance.clear();
- try {
- this.valueClazz = (Class<T>) Class.forName(Text.readString(in));
- } catch (ClassNotFoundException e) {
- throw (IOException) new IOException("Failed map init").initCause(e);
- }
- int entries = WritableUtils.readVInt(in);
- try {
- for (int i = 0; i < entries; i++) {
- Text txt = new Text();
- txt.readFields(in);
- T value = valueClazz.newInstance();
- value.readFields(in);
- instance.put(txt, value);
- }
- } catch (IllegalAccessException e) {
- throw (IOException) new IOException("Failed map init").initCause(e);
- } catch (InstantiationException e) {
- throw (IOException) new IOException("Failed map init").initCause(e);
- }
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- Text.writeString(out, valueClazz.getName());
- WritableUtils.writeVInt(out, instance.size());
- for (Map.Entry<Text, T> e : instance.entrySet()) {
- e.getKey().write(out);
- e.getValue().write(out);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/types/writable/TupleWritable.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/writable/TupleWritable.java b/crunch/src/main/java/org/apache/crunch/types/writable/TupleWritable.java
deleted file mode 100644
index 1c3536b..0000000
--- a/crunch/src/main/java/org/apache/crunch/types/writable/TupleWritable.java
+++ /dev/null
@@ -1,224 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.types.writable;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.commons.lang.builder.HashCodeBuilder;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.io.WritableUtils;
-
-/**
- * A straight copy of the TupleWritable implementation in the join package,
- * added here because of its package visibility restrictions.
- *
- */
-public class TupleWritable implements WritableComparable<TupleWritable> {
-
- private long written;
- private Writable[] values;
-
- /**
- * Create an empty tuple with no allocated storage for writables.
- */
- public TupleWritable() {
- }
-
- /**
- * Initialize tuple with storage; unknown whether any of them contain
- * "written" values.
- */
- public TupleWritable(Writable[] vals) {
- written = 0L;
- values = vals;
- }
-
- /**
- * Return true if tuple has an element at the position provided.
- */
- public boolean has(int i) {
- return 0 != ((1 << i) & written);
- }
-
- /**
- * Get ith Writable from Tuple.
- */
- public Writable get(int i) {
- return values[i];
- }
-
- /**
- * The number of children in this Tuple.
- */
- public int size() {
- return values.length;
- }
-
- /**
- * {@inheritDoc}
- */
- public boolean equals(Object other) {
- if (other instanceof TupleWritable) {
- TupleWritable that = (TupleWritable) other;
- if (this.size() != that.size() || this.written != that.written) {
- return false;
- }
- for (int i = 0; i < values.length; ++i) {
- if (!has(i))
- continue;
- if (!values[i].equals(that.get(i))) {
- return false;
- }
- }
- return true;
- }
- return false;
- }
-
- public int hashCode() {
- HashCodeBuilder builder = new HashCodeBuilder();
- builder.append(written);
- for (Writable v : values) {
- builder.append(v);
- }
- return builder.toHashCode();
- }
-
- /**
- * Convert Tuple to String as in the following.
- * <tt>[<child1>,<child2>,...,<childn>]</tt>
- */
- public String toString() {
- StringBuffer buf = new StringBuffer("[");
- for (int i = 0; i < values.length; ++i) {
- buf.append(has(i) ? values[i].toString() : "");
- buf.append(",");
- }
- if (values.length != 0)
- buf.setCharAt(buf.length() - 1, ']');
- else
- buf.append(']');
- return buf.toString();
- }
-
- /**
- * Writes each Writable to <code>out</code>. TupleWritable format:
- * {@code
- * <count><type1><type2>...<typen><obj1><obj2>...<objn>
- * }
- */
- public void write(DataOutput out) throws IOException {
- WritableUtils.writeVInt(out, values.length);
- WritableUtils.writeVLong(out, written);
- for (int i = 0; i < values.length; ++i) {
- if (has(i)) {
- Text.writeString(out, values[i].getClass().getName());
- }
- }
- for (int i = 0; i < values.length; ++i) {
- if (has(i)) {
- values[i].write(out);
- }
- }
- }
-
- /**
- * {@inheritDoc}
- */
- @SuppressWarnings("unchecked")
- // No static typeinfo on Tuples
- public void readFields(DataInput in) throws IOException {
- int card = WritableUtils.readVInt(in);
- values = new Writable[card];
- written = WritableUtils.readVLong(in);
- Class<? extends Writable>[] cls = new Class[card];
- try {
- for (int i = 0; i < card; ++i) {
- if (has(i)) {
- cls[i] = Class.forName(Text.readString(in)).asSubclass(Writable.class);
- }
- }
- for (int i = 0; i < card; ++i) {
- if (has(i)) {
- values[i] = cls[i].newInstance();
- values[i].readFields(in);
- }
- }
- } catch (ClassNotFoundException e) {
- throw (IOException) new IOException("Failed tuple init").initCause(e);
- } catch (IllegalAccessException e) {
- throw (IOException) new IOException("Failed tuple init").initCause(e);
- } catch (InstantiationException e) {
- throw (IOException) new IOException("Failed tuple init").initCause(e);
- }
- }
-
- /**
- * Record that the tuple contains an element at the position provided.
- */
- public void setWritten(int i) {
- written |= 1 << i;
- }
-
- /**
- * Record that the tuple does not contain an element at the position provided.
- */
- public void clearWritten(int i) {
- written &= -1 ^ (1 << i);
- }
-
- /**
- * Clear any record of which writables have been written to, without releasing
- * storage.
- */
- public void clearWritten() {
- written = 0L;
- }
-
- @Override
- public int compareTo(TupleWritable o) {
- for (int i = 0; i < values.length; ++i) {
- if (has(i) && !o.has(i)) {
- return 1;
- } else if (!has(i) && o.has(i)) {
- return -1;
- } else {
- Writable v1 = values[i];
- Writable v2 = o.values[i];
- if (v1 != v2 && (v1 != null && !v1.equals(v2))) {
- if (v1 instanceof WritableComparable && v2 instanceof WritableComparable) {
- int cmp = ((WritableComparable) v1).compareTo((WritableComparable) v2);
- if (cmp != 0) {
- return cmp;
- }
- } else {
- int cmp = v1.hashCode() - v2.hashCode();
- if (cmp != 0) {
- return cmp;
- }
- }
- }
- }
- }
- return values.length - o.values.length;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/types/writable/WritableDeepCopier.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/writable/WritableDeepCopier.java b/crunch/src/main/java/org/apache/crunch/types/writable/WritableDeepCopier.java
deleted file mode 100644
index 7b6e11b..0000000
--- a/crunch/src/main/java/org/apache/crunch/types/writable/WritableDeepCopier.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.types.writable;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInput;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-
-import org.apache.crunch.CrunchRuntimeException;
-import org.apache.crunch.types.DeepCopier;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.Writable;
-
-/**
- * Performs deep copies of Writable values.
- *
- * @param <T> The type of Writable that can be copied
- */
-public class WritableDeepCopier<T extends Writable> implements DeepCopier<T> {
-
- private Class<T> writableClass;
-
- public WritableDeepCopier(Class<T> writableClass) {
- this.writableClass = writableClass;
- }
-
- @Override
- public void initialize(Configuration conf) {
- }
-
- @Override
- public T deepCopy(T source) {
-
- if (source == null) {
- return null;
- }
-
- ByteArrayOutputStream byteOutStream = new ByteArrayOutputStream();
- DataOutputStream dataOut = new DataOutputStream(byteOutStream);
- T copiedValue = null;
- try {
- source.write(dataOut);
- dataOut.flush();
- ByteArrayInputStream byteInStream = new ByteArrayInputStream(byteOutStream.toByteArray());
- DataInput dataInput = new DataInputStream(byteInStream);
- copiedValue = writableClass.newInstance();
- copiedValue.readFields(dataInput);
- } catch (Exception e) {
- throw new CrunchRuntimeException("Error while deep copying " + source, e);
- }
- return copiedValue;
- }
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/types/writable/WritableGroupedTableType.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/writable/WritableGroupedTableType.java b/crunch/src/main/java/org/apache/crunch/types/writable/WritableGroupedTableType.java
deleted file mode 100644
index 84318d3..0000000
--- a/crunch/src/main/java/org/apache/crunch/types/writable/WritableGroupedTableType.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.types.writable;
-
-import org.apache.crunch.GroupingOptions;
-import org.apache.crunch.MapFn;
-import org.apache.crunch.Pair;
-import org.apache.crunch.lib.PTables;
-import org.apache.crunch.types.Converter;
-import org.apache.crunch.types.PGroupedTableType;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.Job;
-
-class WritableGroupedTableType<K, V> extends PGroupedTableType<K, V> {
-
- private final MapFn inputFn;
- private final MapFn outputFn;
- private final Converter converter;
-
- public WritableGroupedTableType(WritableTableType<K, V> tableType) {
- super(tableType);
- WritableType keyType = (WritableType) tableType.getKeyType();
- WritableType valueType = (WritableType) tableType.getValueType();
- this.inputFn = new PairIterableMapFn(keyType.getInputMapFn(), valueType.getInputMapFn());
- this.outputFn = tableType.getOutputMapFn();
- this.converter = new WritablePairConverter(keyType.getSerializationClass(),
- valueType.getSerializationClass());
- }
-
- @Override
- public Class<Pair<K, Iterable<V>>> getTypeClass() {
- return (Class<Pair<K, Iterable<V>>>) Pair.of(null, null).getClass();
- }
-
- @Override
- public Converter getGroupingConverter() {
- return converter;
- }
-
- @Override
- public MapFn getInputMapFn() {
- return inputFn;
- }
-
- @Override
- public MapFn getOutputMapFn() {
- return outputFn;
- }
-
- @Override
- public void initialize(Configuration conf) {
- this.tableType.initialize(conf);
- }
-
- @Override
- public Pair<K, Iterable<V>> getDetachedValue(Pair<K, Iterable<V>> value) {
- return PTables.getGroupedDetachedValue(this, value);
- }
-
- @Override
- public void configureShuffle(Job job, GroupingOptions options) {
- if (options != null) {
- options.configure(job);
- }
- WritableType keyType = (WritableType) tableType.getKeyType();
- WritableType valueType = (WritableType) tableType.getValueType();
- job.setMapOutputKeyClass(keyType.getSerializationClass());
- job.setMapOutputValueClass(valueType.getSerializationClass());
- }
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/types/writable/WritablePairConverter.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/writable/WritablePairConverter.java b/crunch/src/main/java/org/apache/crunch/types/writable/WritablePairConverter.java
deleted file mode 100644
index 2db0238..0000000
--- a/crunch/src/main/java/org/apache/crunch/types/writable/WritablePairConverter.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.types.writable;
-
-import org.apache.crunch.Pair;
-import org.apache.crunch.types.Converter;
-
-class WritablePairConverter<K, V> implements Converter<K, V, Pair<K, V>, Pair<K, Iterable<V>>> {
-
- private final Class<K> keyClass;
- private final Class<V> valueClass;
-
- public WritablePairConverter(Class<K> keyClass, Class<V> valueClass) {
- this.keyClass = keyClass;
- this.valueClass = valueClass;
- }
-
- @Override
- public Pair<K, V> convertInput(K key, V value) {
- return Pair.of(key, value);
- }
-
- @Override
- public K outputKey(Pair<K, V> value) {
- return value.first();
- }
-
- @Override
- public V outputValue(Pair<K, V> value) {
- return value.second();
- }
-
- @Override
- public Class<K> getKeyClass() {
- return keyClass;
- }
-
- @Override
- public Class<V> getValueClass() {
- return valueClass;
- }
-
- @Override
- public Pair<K, Iterable<V>> convertIterableInput(K key, Iterable<V> value) {
- return Pair.of(key, value);
- }
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/types/writable/WritableTableType.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/writable/WritableTableType.java b/crunch/src/main/java/org/apache/crunch/types/writable/WritableTableType.java
deleted file mode 100644
index 93e0fd6..0000000
--- a/crunch/src/main/java/org/apache/crunch/types/writable/WritableTableType.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.types.writable;
-
-import java.util.List;
-
-import org.apache.commons.lang.builder.HashCodeBuilder;
-import org.apache.crunch.MapFn;
-import org.apache.crunch.Pair;
-import org.apache.crunch.fn.PairMapFn;
-import org.apache.crunch.io.ReadableSourceTarget;
-import org.apache.crunch.io.seq.SeqFileTableSourceTarget;
-import org.apache.crunch.lib.PTables;
-import org.apache.crunch.types.Converter;
-import org.apache.crunch.types.PGroupedTableType;
-import org.apache.crunch.types.PTableType;
-import org.apache.crunch.types.PType;
-import org.apache.crunch.types.PTypeFamily;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Writable;
-
-import com.google.common.collect.ImmutableList;
-
-class WritableTableType<K, V> implements PTableType<K, V> {
-
- private final WritableType<K, Writable> keyType;
- private final WritableType<V, Writable> valueType;
- private final MapFn inputFn;
- private final MapFn outputFn;
- private final Converter converter;
-
- public WritableTableType(WritableType<K, Writable> keyType, WritableType<V, Writable> valueType) {
- this.keyType = keyType;
- this.valueType = valueType;
- this.inputFn = new PairMapFn(keyType.getInputMapFn(), valueType.getInputMapFn());
- this.outputFn = new PairMapFn(keyType.getOutputMapFn(), valueType.getOutputMapFn());
- this.converter = new WritablePairConverter(keyType.getSerializationClass(),
- valueType.getSerializationClass());
- }
-
- @Override
- public Class<Pair<K, V>> getTypeClass() {
- return (Class<Pair<K, V>>) Pair.of(null, null).getClass();
- }
-
- @Override
- public List<PType> getSubTypes() {
- return ImmutableList.<PType> of(keyType, valueType);
- }
-
- @Override
- public MapFn getInputMapFn() {
- return inputFn;
- }
-
- @Override
- public MapFn getOutputMapFn() {
- return outputFn;
- }
-
- @Override
- public Converter getConverter() {
- return converter;
- }
-
- @Override
- public PTypeFamily getFamily() {
- return WritableTypeFamily.getInstance();
- }
-
- public PType<K> getKeyType() {
- return keyType;
- }
-
- public PType<V> getValueType() {
- return valueType;
- }
-
- @Override
- public PGroupedTableType<K, V> getGroupedTableType() {
- return new WritableGroupedTableType<K, V>(this);
- }
-
- @Override
- public ReadableSourceTarget<Pair<K, V>> getDefaultFileSource(Path path) {
- return new SeqFileTableSourceTarget<K, V>(path, this);
- }
-
- @Override
- public void initialize(Configuration conf) {
- keyType.initialize(conf);
- valueType.initialize(conf);
- }
-
- @Override
- public Pair<K, V> getDetachedValue(Pair<K, V> value) {
- return PTables.getDetachedValue(this, value);
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj == null || !(obj instanceof WritableTableType)) {
- return false;
- }
- WritableTableType that = (WritableTableType) obj;
- return keyType.equals(that.keyType) && valueType.equals(that.valueType);
- }
-
- @Override
- public int hashCode() {
- HashCodeBuilder hcb = new HashCodeBuilder();
- return hcb.append(keyType).append(valueType).toHashCode();
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/types/writable/WritableType.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/writable/WritableType.java b/crunch/src/main/java/org/apache/crunch/types/writable/WritableType.java
deleted file mode 100644
index 734946c..0000000
--- a/crunch/src/main/java/org/apache/crunch/types/writable/WritableType.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.types.writable;
-
-import java.util.List;
-
-import org.apache.commons.lang.builder.HashCodeBuilder;
-import org.apache.crunch.MapFn;
-import org.apache.crunch.io.ReadableSourceTarget;
-import org.apache.crunch.io.seq.SeqFileSourceTarget;
-import org.apache.crunch.types.Converter;
-import org.apache.crunch.types.DeepCopier;
-import org.apache.crunch.types.PType;
-import org.apache.crunch.types.PTypeFamily;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Writable;
-
-import com.google.common.collect.ImmutableList;
-
-public class WritableType<T, W extends Writable> implements PType<T> {
-
- private final Class<T> typeClass;
- private final Class<W> writableClass;
- private final Converter converter;
- private final MapFn<W, T> inputFn;
- private final MapFn<T, W> outputFn;
- private final DeepCopier<W> deepCopier;
- private final List<PType> subTypes;
- private boolean initialized = false;
-
- public WritableType(Class<T> typeClass, Class<W> writableClass, MapFn<W, T> inputDoFn,
- MapFn<T, W> outputDoFn, PType... subTypes) {
- this.typeClass = typeClass;
- this.writableClass = writableClass;
- this.inputFn = inputDoFn;
- this.outputFn = outputDoFn;
- this.converter = new WritableValueConverter(writableClass);
- this.deepCopier = new WritableDeepCopier<W>(writableClass);
- this.subTypes = ImmutableList.<PType> builder().add(subTypes).build();
- }
-
- @Override
- public PTypeFamily getFamily() {
- return WritableTypeFamily.getInstance();
- }
-
- @Override
- public Class<T> getTypeClass() {
- return typeClass;
- }
-
- @Override
- public Converter getConverter() {
- return converter;
- }
-
- @Override
- public MapFn getInputMapFn() {
- return inputFn;
- }
-
- @Override
- public MapFn getOutputMapFn() {
- return outputFn;
- }
-
- @Override
- public List<PType> getSubTypes() {
- return subTypes;
- }
-
- public Class<W> getSerializationClass() {
- return writableClass;
- }
-
- @Override
- public ReadableSourceTarget<T> getDefaultFileSource(Path path) {
- return new SeqFileSourceTarget<T>(path, this);
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj == null || !(obj instanceof WritableType)) {
- return false;
- }
- WritableType wt = (WritableType) obj;
- return (typeClass.equals(wt.typeClass) && writableClass.equals(wt.writableClass) && subTypes
- .equals(wt.subTypes));
- }
-
- @Override
- public void initialize(Configuration conf) {
- this.inputFn.initialize();
- this.outputFn.initialize();
- for (PType subType : subTypes) {
- subType.initialize(conf);
- }
- this.initialized = true;
- }
-
- @Override
- public T getDetachedValue(T value) {
- if (!initialized) {
- throw new IllegalStateException("Cannot call getDetachedValue on an uninitialized PType");
- }
- W writableValue = outputFn.map(value);
- W deepCopy = this.deepCopier.deepCopy(writableValue);
- return inputFn.map(deepCopy);
- }
-
- @Override
- public int hashCode() {
- HashCodeBuilder hcb = new HashCodeBuilder();
- hcb.append(typeClass).append(writableClass).append(subTypes);
- return hcb.toHashCode();
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/types/writable/WritableTypeFamily.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/writable/WritableTypeFamily.java b/crunch/src/main/java/org/apache/crunch/types/writable/WritableTypeFamily.java
deleted file mode 100644
index a94db96..0000000
--- a/crunch/src/main/java/org/apache/crunch/types/writable/WritableTypeFamily.java
+++ /dev/null
@@ -1,147 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.types.writable;
-
-import java.nio.ByteBuffer;
-import java.util.Collection;
-import java.util.Map;
-
-import org.apache.crunch.MapFn;
-import org.apache.crunch.Pair;
-import org.apache.crunch.Tuple;
-import org.apache.crunch.Tuple3;
-import org.apache.crunch.Tuple4;
-import org.apache.crunch.TupleN;
-import org.apache.crunch.types.PGroupedTableType;
-import org.apache.crunch.types.PTableType;
-import org.apache.crunch.types.PType;
-import org.apache.crunch.types.PTypeFamily;
-import org.apache.crunch.types.PTypeUtils;
-import org.apache.hadoop.io.Writable;
-
-/**
- * The {@link Writable}-based implementation of the
- * {@link org.apache.crunch.types.PTypeFamily} interface.
- */
-public class WritableTypeFamily implements PTypeFamily {
-
- private static final WritableTypeFamily INSTANCE = new WritableTypeFamily();
-
- public static WritableTypeFamily getInstance() {
- return INSTANCE;
- }
-
- // Disallow construction
- private WritableTypeFamily() {
- }
-
- public PType<Void> nulls() {
- return Writables.nulls();
- }
-
- public PType<String> strings() {
- return Writables.strings();
- }
-
- public PType<Long> longs() {
- return Writables.longs();
- }
-
- public PType<Integer> ints() {
- return Writables.ints();
- }
-
- public PType<Float> floats() {
- return Writables.floats();
- }
-
- public PType<Double> doubles() {
- return Writables.doubles();
- }
-
- public PType<Boolean> booleans() {
- return Writables.booleans();
- }
-
- public PType<ByteBuffer> bytes() {
- return Writables.bytes();
- }
-
- public <T> PType<T> records(Class<T> clazz) {
- return Writables.records(clazz);
- }
-
- public <W extends Writable> PType<W> writables(Class<W> clazz) {
- return Writables.writables(clazz);
- }
-
- public <K, V> PTableType<K, V> tableOf(PType<K> key, PType<V> value) {
- return Writables.tableOf(key, value);
- }
-
- public <V1, V2> PType<Pair<V1, V2>> pairs(PType<V1> p1, PType<V2> p2) {
- return Writables.pairs(p1, p2);
- }
-
- public <V1, V2, V3> PType<Tuple3<V1, V2, V3>> triples(PType<V1> p1, PType<V2> p2, PType<V3> p3) {
- return Writables.triples(p1, p2, p3);
- }
-
- public <V1, V2, V3, V4> PType<Tuple4<V1, V2, V3, V4>> quads(PType<V1> p1, PType<V2> p2, PType<V3> p3, PType<V4> p4) {
- return Writables.quads(p1, p2, p3, p4);
- }
-
- public PType<TupleN> tuples(PType<?>... ptypes) {
- return Writables.tuples(ptypes);
- }
-
- public <T> PType<Collection<T>> collections(PType<T> ptype) {
- return Writables.collections(ptype);
- }
-
- public <T> PType<Map<String, T>> maps(PType<T> ptype) {
- return Writables.maps(ptype);
- }
-
- @Override
- public <T> PType<T> as(PType<T> ptype) {
- if (ptype instanceof WritableType || ptype instanceof WritableTableType
- || ptype instanceof WritableGroupedTableType) {
- return ptype;
- }
- if (ptype instanceof PGroupedTableType) {
- PTableType ptt = ((PGroupedTableType) ptype).getTableType();
- return new WritableGroupedTableType((WritableTableType) as(ptt));
- }
- PType<T> prim = Writables.getPrimitiveType(ptype.getTypeClass());
- if (prim != null) {
- return prim;
- }
- return PTypeUtils.convert(ptype, this);
- }
-
- @Override
- public <T extends Tuple> PType<T> tuples(Class<T> clazz, PType<?>... ptypes) {
- return Writables.tuples(clazz, ptypes);
- }
-
- @Override
- public <S, T> PType<T> derived(Class<T> clazz, MapFn<S, T> inputFn, MapFn<T, S> outputFn, PType<S> base) {
- return Writables.derived(clazz, inputFn, outputFn, base);
- }
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/types/writable/WritableValueConverter.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/writable/WritableValueConverter.java b/crunch/src/main/java/org/apache/crunch/types/writable/WritableValueConverter.java
deleted file mode 100644
index 3670b90..0000000
--- a/crunch/src/main/java/org/apache/crunch/types/writable/WritableValueConverter.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.types.writable;
-
-import org.apache.crunch.types.Converter;
-import org.apache.hadoop.io.NullWritable;
-
-class WritableValueConverter<W> implements Converter<Object, W, W, Iterable<W>> {
-
- private final Class<W> serializationClass;
-
- public WritableValueConverter(Class<W> serializationClass) {
- this.serializationClass = serializationClass;
- }
-
- @Override
- public W convertInput(Object key, W value) {
- return value;
- }
-
- @Override
- public Object outputKey(W value) {
- return NullWritable.get();
- }
-
- @Override
- public W outputValue(W value) {
- return value;
- }
-
- @Override
- public Class<Object> getKeyClass() {
- return (Class<Object>) (Class<?>) NullWritable.class;
- }
-
- @Override
- public Class<W> getValueClass() {
- return serializationClass;
- }
-
- @Override
- public Iterable<W> convertIterableInput(Object key, Iterable<W> value) {
- return value;
- }
-}
\ No newline at end of file