You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2013/04/23 22:41:08 UTC
[06/43] CRUNCH-196: crunch -> crunch-core rename to fix build issues
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/types/PTypeUtils.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/PTypeUtils.java b/crunch/src/main/java/org/apache/crunch/types/PTypeUtils.java
deleted file mode 100644
index e61b98b..0000000
--- a/crunch/src/main/java/org/apache/crunch/types/PTypeUtils.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.types;
-
-import java.util.Collection;
-import java.util.List;
-
-import org.apache.crunch.Pair;
-import org.apache.crunch.Tuple;
-import org.apache.crunch.Tuple3;
-import org.apache.crunch.Tuple4;
-import org.apache.crunch.TupleN;
-
-/**
- * Utilities for converting between {@code PType}s from different
- * {@code PTypeFamily} implementations.
- *
- */
-public class PTypeUtils {
-
- public static <T> PType<T> convert(PType<T> ptype, PTypeFamily tf) {
- if (ptype instanceof PTableType) {
- PTableType ptt = (PTableType) ptype;
- return tf.tableOf(tf.as(ptt.getKeyType()), tf.as(ptt.getValueType()));
- }
- Class<T> typeClass = ptype.getTypeClass();
- if (Tuple.class.isAssignableFrom(typeClass)) {
- List<PType> subTypes = ptype.getSubTypes();
- if (Pair.class.equals(typeClass)) {
- return tf.pairs(tf.as(subTypes.get(0)), tf.as(subTypes.get(1)));
- } else if (Tuple3.class.equals(typeClass)) {
- return tf.triples(tf.as(subTypes.get(0)), tf.as(subTypes.get(1)), tf.as(subTypes.get(2)));
- } else if (Tuple4.class.equals(typeClass)) {
- return tf.quads(tf.as(subTypes.get(0)), tf.as(subTypes.get(1)), tf.as(subTypes.get(2)), tf.as(subTypes.get(3)));
- } else if (TupleN.class.equals(typeClass)) {
- PType[] newPTypes = subTypes.toArray(new PType[0]);
- for (int i = 0; i < newPTypes.length; i++) {
- newPTypes[i] = tf.as(subTypes.get(i));
- }
- return (PType<T>) tf.tuples(newPTypes);
- }
- }
- if (Collection.class.isAssignableFrom(typeClass)) {
- return tf.collections(tf.as(ptype.getSubTypes().get(0)));
- }
- return tf.records(typeClass);
- }
-
- private PTypeUtils() {
- }
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/types/PTypes.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/PTypes.java b/crunch/src/main/java/org/apache/crunch/types/PTypes.java
deleted file mode 100644
index 546719c..0000000
--- a/crunch/src/main/java/org/apache/crunch/types/PTypes.java
+++ /dev/null
@@ -1,252 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.types;
-
-import java.math.BigInteger;
-import java.nio.ByteBuffer;
-import java.util.UUID;
-
-import org.apache.crunch.CrunchRuntimeException;
-import org.apache.crunch.MapFn;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.thrift.TBase;
-import org.apache.thrift.TDeserializer;
-import org.apache.thrift.TException;
-import org.apache.thrift.TSerializer;
-import org.apache.thrift.protocol.TBinaryProtocol;
-import org.codehaus.jackson.map.ObjectMapper;
-
-import com.google.protobuf.InvalidProtocolBufferException;
-import com.google.protobuf.Message;
-
-/**
- * Utility functions for creating common types of derived PTypes, e.g., for JSON
- * data, protocol buffers, and Thrift records.
- *
- */
-public class PTypes {
-
- public static PType<BigInteger> bigInt(PTypeFamily typeFamily) {
- return typeFamily.derived(BigInteger.class, BYTE_TO_BIGINT, BIGINT_TO_BYTE, typeFamily.bytes());
- }
-
- public static PType<UUID> uuid(PTypeFamily ptf) {
- return ptf.derived(UUID.class, BYTE_TO_UUID, UUID_TO_BYTE, ptf.bytes());
- }
-
- public static <T> PType<T> jsonString(Class<T> clazz, PTypeFamily typeFamily) {
- return typeFamily
- .derived(clazz, new JacksonInputMapFn<T>(clazz), new JacksonOutputMapFn<T>(), typeFamily.strings());
- }
-
- public static <T extends Message> PType<T> protos(Class<T> clazz, PTypeFamily typeFamily) {
- return typeFamily.derived(clazz, new ProtoInputMapFn<T>(clazz), new ProtoOutputMapFn<T>(), typeFamily.bytes());
- }
-
- public static <T extends TBase> PType<T> thrifts(Class<T> clazz, PTypeFamily typeFamily) {
- return typeFamily.derived(clazz, new ThriftInputMapFn<T>(clazz), new ThriftOutputMapFn<T>(), typeFamily.bytes());
- }
-
- public static final <T extends Enum> PType<T> enums(final Class<T> type, PTypeFamily typeFamily) {
- return typeFamily.derived(type, new EnumInputMapper<T>(type), new EnumOutputMapper<T>(), typeFamily.strings());
- }
-
- public static MapFn<ByteBuffer, BigInteger> BYTE_TO_BIGINT = new MapFn<ByteBuffer, BigInteger>() {
- public BigInteger map(ByteBuffer input) {
- return input == null ? null : new BigInteger(input.array());
- }
- };
-
- public static MapFn<BigInteger, ByteBuffer> BIGINT_TO_BYTE = new MapFn<BigInteger, ByteBuffer>() {
- public ByteBuffer map(BigInteger input) {
- return input == null ? null : ByteBuffer.wrap(input.toByteArray());
- }
- };
-
- private static class JacksonInputMapFn<T> extends MapFn<String, T> {
-
- private final Class<T> clazz;
- private transient ObjectMapper mapper;
-
- public JacksonInputMapFn(Class<T> clazz) {
- this.clazz = clazz;
- }
-
- @Override
- public void initialize() {
- this.mapper = new ObjectMapper();
- }
-
- @Override
- public T map(String input) {
- try {
- return mapper.readValue(input, clazz);
- } catch (Exception e) {
- throw new CrunchRuntimeException(e);
- }
- }
- }
-
- private static class JacksonOutputMapFn<T> extends MapFn<T, String> {
-
- private transient ObjectMapper mapper;
-
- @Override
- public void initialize() {
- this.mapper = new ObjectMapper();
- }
-
- @Override
- public String map(T input) {
- try {
- return mapper.writeValueAsString(input);
- } catch (Exception e) {
- throw new CrunchRuntimeException(e);
- }
- }
- }
-
- private static class ProtoInputMapFn<T extends Message> extends MapFn<ByteBuffer, T> {
-
- private final Class<T> clazz;
- private transient T instance;
-
- public ProtoInputMapFn(Class<T> clazz) {
- this.clazz = clazz;
- }
-
- @Override
- public void initialize() {
- this.instance = Protos.getDefaultInstance(clazz);
- }
-
- @Override
- public T map(ByteBuffer bb) {
- try {
- return (T) instance.newBuilderForType().mergeFrom(bb.array(), bb.position(), bb.limit()).build();
- } catch (InvalidProtocolBufferException e) {
- throw new CrunchRuntimeException(e);
- }
- }
- }
-
- private static class ProtoOutputMapFn<T extends Message> extends MapFn<T, ByteBuffer> {
-
- public ProtoOutputMapFn() {
- }
-
- @Override
- public ByteBuffer map(T proto) {
- return ByteBuffer.wrap(proto.toByteArray());
- }
- }
-
- private static class ThriftInputMapFn<T extends TBase> extends MapFn<ByteBuffer, T> {
-
- private final Class<T> clazz;
- private transient T instance;
- private transient TDeserializer deserializer;
- private transient byte[] bytes;
-
- public ThriftInputMapFn(Class<T> clazz) {
- this.clazz = clazz;
- }
-
- @Override
- public void initialize() {
- this.instance = ReflectionUtils.newInstance(clazz, null);
- this.deserializer = new TDeserializer(new TBinaryProtocol.Factory());
- this.bytes = new byte[0];
- }
-
- @Override
- public T map(ByteBuffer bb) {
- T next = (T) instance.deepCopy();
- int len = bb.limit() - bb.position();
- if (len != bytes.length) {
- bytes = new byte[len];
- }
- System.arraycopy(bb.array(), bb.position(), bytes, 0, len);
- try {
- deserializer.deserialize(next, bytes);
- } catch (TException e) {
- throw new CrunchRuntimeException(e);
- }
- return next;
- }
- }
-
- private static class ThriftOutputMapFn<T extends TBase> extends MapFn<T, ByteBuffer> {
-
- private transient TSerializer serializer;
-
- public ThriftOutputMapFn() {
- }
-
- @Override
- public void initialize() {
- this.serializer = new TSerializer(new TBinaryProtocol.Factory());
- }
-
- @Override
- public ByteBuffer map(T t) {
- try {
- return ByteBuffer.wrap(serializer.serialize(t));
- } catch (TException e) {
- throw new CrunchRuntimeException(e);
- }
- }
- }
-
- private static class EnumInputMapper<T extends Enum> extends MapFn<String, T> {
- private final Class<T> type;
-
- public EnumInputMapper(Class<T> type) {
- this.type = type;
- }
-
- @Override
- public T map(String input) {
- return (T) Enum.valueOf(type, input);
- }
- };
-
- private static class EnumOutputMapper<T extends Enum> extends MapFn<T, String> {
-
- @Override
- public String map(T input) {
- return input.name();
- }
- };
-
- private static MapFn<ByteBuffer, UUID> BYTE_TO_UUID = new MapFn<ByteBuffer, UUID>() {
- @Override
- public UUID map(ByteBuffer input) {
- return new UUID(input.getLong(), input.getLong());
- }
- };
-
- private static MapFn<UUID, ByteBuffer> UUID_TO_BYTE = new MapFn<UUID, ByteBuffer>() {
- @Override
- public ByteBuffer map(UUID input) {
- ByteBuffer bb = ByteBuffer.wrap(new byte[16]);
- bb.asLongBuffer().put(input.getMostSignificantBits()).put(input.getLeastSignificantBits());
- return bb;
- }
- };
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/types/Protos.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/Protos.java b/crunch/src/main/java/org/apache/crunch/types/Protos.java
deleted file mode 100644
index 4cd5068..0000000
--- a/crunch/src/main/java/org/apache/crunch/types/Protos.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.types;
-
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.crunch.CrunchRuntimeException;
-import org.apache.crunch.DoFn;
-import org.apache.crunch.Emitter;
-import org.apache.crunch.MapFn;
-import org.apache.hadoop.util.ReflectionUtils;
-
-import com.google.common.base.Splitter;
-import com.google.protobuf.Descriptors.FieldDescriptor;
-import com.google.protobuf.Message;
-import com.google.protobuf.Message.Builder;
-
-/**
- * Utility functions for working with protocol buffers in Crunch.
- */
-public class Protos {
-
- /**
- * Utility function for creating a default PB Messgae from a Class object that
- * works with both protoc 2.3.0 and 2.4.x.
- * @param clazz The class of the protocol buffer to create
- * @return An instance of a protocol buffer
- */
- public static <M extends Message> M getDefaultInstance(Class<M> clazz) {
- if (clazz.getConstructors().length > 0) {
- // Protobuf 2.3.0
- return ReflectionUtils.newInstance(clazz, null);
- } else {
- // Protobuf 2.4.x
- try {
- Message.Builder mb = (Message.Builder) clazz.getDeclaredMethod("newBuilder").invoke(null);
- return (M) mb.getDefaultInstanceForType();
- } catch (Exception e) {
- throw new CrunchRuntimeException(e);
- }
- }
- }
-
- public static <M extends Message, K> MapFn<M, K> extractKey(String fieldName) {
- return new ExtractKeyFn<M, K>(fieldName);
- }
-
- public static <M extends Message> DoFn<String, M> lineParser(String sep, Class<M> msgClass) {
- return new TextToProtoFn<M>(sep, msgClass);
- }
-
- private static class ExtractKeyFn<M extends Message, K> extends MapFn<M, K> {
-
- private final String fieldName;
-
- private transient FieldDescriptor fd;
-
- public ExtractKeyFn(String fieldName) {
- this.fieldName = fieldName;
- }
-
- @Override
- public K map(M input) {
- if (input == null) {
- throw new IllegalArgumentException("Null inputs not supported by Protos.ExtractKeyFn");
- } else if (fd == null) {
- fd = input.getDescriptorForType().findFieldByName(fieldName);
- if (fd == null) {
- throw new IllegalStateException("Could not find field: " + fieldName + " in message: " + input);
- }
- }
- return (K) input.getField(fd);
- }
-
- }
-
- private static class TextToProtoFn<M extends Message> extends DoFn<String, M> {
-
- private final String sep;
- private final Class<M> msgClass;
-
- private transient M msgInstance;
- private transient List<FieldDescriptor> fields;
- private transient Splitter splitter;
-
- enum ParseErrors {
- TOTAL,
- NUMBER_FORMAT
- };
-
- public TextToProtoFn(String sep, Class<M> msgClass) {
- this.sep = sep;
- this.msgClass = msgClass;
- }
-
- @Override
- public void initialize() {
- this.msgInstance = getDefaultInstance(msgClass);
- this.fields = msgInstance.getDescriptorForType().getFields();
- this.splitter = Splitter.on(sep);
- }
-
- @Override
- public void process(String input, Emitter<M> emitter) {
- if (input != null && !input.isEmpty()) {
- Builder b = msgInstance.newBuilderForType();
- Iterator<String> iter = splitter.split(input).iterator();
- boolean parseError = false;
- for (FieldDescriptor fd : fields) {
- if (iter.hasNext()) {
- String value = iter.next();
- if (value != null && !value.isEmpty()) {
- Object parsedValue = null;
- try {
- switch (fd.getJavaType()) {
- case STRING:
- parsedValue = value;
- break;
- case INT:
- parsedValue = Integer.valueOf(value);
- break;
- case LONG:
- parsedValue = Long.valueOf(value);
- break;
- case FLOAT:
- parsedValue = Float.valueOf(value);
- break;
- case DOUBLE:
- parsedValue = Double.valueOf(value);
- break;
- case BOOLEAN:
- parsedValue = Boolean.valueOf(value);
- break;
- case ENUM:
- parsedValue = fd.getEnumType().findValueByName(value);
- break;
- }
- b.setField(fd, parsedValue);
- } catch (NumberFormatException nfe) {
- increment(ParseErrors.NUMBER_FORMAT);
- parseError = true;
- break;
- }
- }
- }
- }
-
- if (parseError) {
- increment(ParseErrors.TOTAL);
- } else {
- emitter.emit((M) b.build());
- }
- }
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/types/TupleDeepCopier.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/TupleDeepCopier.java b/crunch/src/main/java/org/apache/crunch/types/TupleDeepCopier.java
deleted file mode 100644
index a2ffae3..0000000
--- a/crunch/src/main/java/org/apache/crunch/types/TupleDeepCopier.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.types;
-
-import java.util.List;
-
-import org.apache.crunch.Tuple;
-import org.apache.hadoop.conf.Configuration;
-
-import com.google.common.collect.Lists;
-
-/**
- * Performs deep copies (based on underlying PType deep copying) of Tuple-based objects.
- *
- * @param <T> The type of Tuple implementation being copied
- */
-public class TupleDeepCopier<T extends Tuple> implements DeepCopier<T> {
-
- private final TupleFactory<T> tupleFactory;
- private final List<PType> elementTypes;
-
- public TupleDeepCopier(Class<T> tupleClass, PType... elementTypes) {
- tupleFactory = TupleFactory.getTupleFactory(tupleClass);
- this.elementTypes = Lists.newArrayList(elementTypes);
- }
-
- @Override
- public void initialize(Configuration conf) {
- for (PType elementType : elementTypes) {
- elementType.initialize(conf);
- }
- }
-
- @Override
- public T deepCopy(T source) {
-
- if (source == null) {
- return null;
- }
-
- Object[] deepCopyValues = new Object[source.size()];
-
- for (int valueIndex = 0; valueIndex < elementTypes.size(); valueIndex++) {
- PType elementType = elementTypes.get(valueIndex);
- deepCopyValues[valueIndex] = elementType.getDetachedValue(source.get(valueIndex));
- }
-
- return tupleFactory.makeTuple(deepCopyValues);
- }
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/types/TupleFactory.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/TupleFactory.java b/crunch/src/main/java/org/apache/crunch/types/TupleFactory.java
deleted file mode 100644
index 73b47de..0000000
--- a/crunch/src/main/java/org/apache/crunch/types/TupleFactory.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.types;
-
-import java.io.Serializable;
-import java.lang.reflect.Constructor;
-import java.util.Map;
-
-import org.apache.crunch.CrunchRuntimeException;
-import org.apache.crunch.Pair;
-import org.apache.crunch.Tuple;
-import org.apache.crunch.Tuple3;
-import org.apache.crunch.Tuple4;
-import org.apache.crunch.TupleN;
-
-import com.google.common.collect.Maps;
-
-public abstract class TupleFactory<T extends Tuple> implements Serializable {
-
- public void initialize() {
- }
-
- public abstract T makeTuple(Object... values);
-
-
- private static final Map<Class, TupleFactory> customTupleFactories = Maps.newHashMap();
-
- /**
- * Get the {@link TupleFactory} for a given Tuple implementation.
- *
- * @param tupleClass
- * The class for which the factory is to be retrieved
- * @return The appropriate TupleFactory
- */
- public static <T extends Tuple> TupleFactory<T> getTupleFactory(Class<T> tupleClass) {
- if (tupleClass == Pair.class) {
- return (TupleFactory<T>) PAIR;
- } else if (tupleClass == Tuple3.class) {
- return (TupleFactory<T>) TUPLE3;
- } else if (tupleClass == Tuple4.class) {
- return (TupleFactory<T>) TUPLE4;
- } else if (tupleClass == TupleN.class) {
- return (TupleFactory<T>) TUPLEN;
- } else if (customTupleFactories.containsKey(tupleClass)) {
- return (TupleFactory<T>) customTupleFactories.get(tupleClass);
- } else {
- throw new IllegalArgumentException("Can't create TupleFactory for " + tupleClass);
- }
- }
-
- public static final TupleFactory<Pair> PAIR = new TupleFactory<Pair>() {
- @Override
- public Pair makeTuple(Object... values) {
- return Pair.of(values[0], values[1]);
- }
- };
-
- public static final TupleFactory<Tuple3> TUPLE3 = new TupleFactory<Tuple3>() {
- @Override
- public Tuple3 makeTuple(Object... values) {
- return Tuple3.of(values[0], values[1], values[2]);
- }
- };
-
- public static final TupleFactory<Tuple4> TUPLE4 = new TupleFactory<Tuple4>() {
- @Override
- public Tuple4 makeTuple(Object... values) {
- return Tuple4.of(values[0], values[1], values[2], values[3]);
- }
- };
-
- public static final TupleFactory<TupleN> TUPLEN = new TupleFactory<TupleN>() {
- @Override
- public TupleN makeTuple(Object... values) {
- return new TupleN(values);
- }
- };
-
- public static <T extends Tuple> TupleFactory<T> create(Class<T> clazz, Class... typeArgs) {
- if (customTupleFactories.containsKey(clazz)) {
- return (TupleFactory<T>) customTupleFactories.get(clazz);
- }
- TupleFactory<T> custom = new CustomTupleFactory<T>(clazz, typeArgs);
- customTupleFactories.put(clazz, custom);
- return custom;
- }
-
- private static class CustomTupleFactory<T extends Tuple> extends TupleFactory<T> {
-
- private final Class<T> clazz;
- private final Class[] typeArgs;
-
- private transient Constructor<T> constructor;
-
- public CustomTupleFactory(Class<T> clazz, Class[] typeArgs) {
- this.clazz = clazz;
- this.typeArgs = typeArgs;
- }
-
- @Override
- public void initialize() {
- try {
- constructor = clazz.getConstructor(typeArgs);
- } catch (Exception e) {
- throw new CrunchRuntimeException(e);
- }
- }
-
- @Override
- public T makeTuple(Object... values) {
- try {
- return constructor.newInstance(values);
- } catch (Exception e) {
- throw new CrunchRuntimeException(e);
- }
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/types/avro/AvroCapabilities.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/avro/AvroCapabilities.java b/crunch/src/main/java/org/apache/crunch/types/avro/AvroCapabilities.java
deleted file mode 100644
index cc1636c..0000000
--- a/crunch/src/main/java/org/apache/crunch/types/avro/AvroCapabilities.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.types.avro;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-
-import org.apache.avro.Schema;
-import org.apache.avro.io.BinaryDecoder;
-import org.apache.avro.io.BinaryEncoder;
-import org.apache.avro.io.DecoderFactory;
-import org.apache.avro.io.EncoderFactory;
-import org.apache.avro.reflect.ReflectDatumReader;
-import org.apache.avro.reflect.ReflectDatumWriter;
-
-import com.google.common.collect.Lists;
-
-/**
- * Determines the capabilities of the Avro version that is currently being used.
- */
-class AvroCapabilities {
-
- public static class Record extends org.apache.avro.specific.SpecificRecordBase implements
- org.apache.avro.specific.SpecificRecord {
- public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser()
- .parse("{\"type\":\"record\",\"name\":\"Record\",\"namespace\":\"org.apache.crunch.types.avro\",\"fields\":[{\"name\":\"subrecords\",\"type\":{\"type\":\"array\",\"items\":\"string\"}}]}");
- @Deprecated
- public java.util.List<java.lang.CharSequence> subrecords;
-
- public java.lang.Object get(int field$) {
- switch (field$) {
- case 0:
- return subrecords;
- default:
- throw new org.apache.avro.AvroRuntimeException("Bad index");
- }
- }
-
- // Used by DatumReader. Applications should not call.
- @SuppressWarnings(value = "unchecked")
- public void put(int field$, java.lang.Object value$) {
- switch (field$) {
- case 0:
- subrecords = (java.util.List<java.lang.CharSequence>) value$;
- break;
- default:
- throw new org.apache.avro.AvroRuntimeException("Bad index");
- }
- }
-
- @Override
- public Schema getSchema() {
- return SCHEMA$;
- }
- }
-
- /**
- * Determine if the current Avro version can use the ReflectDatumReader to
- * read SpecificData that includes an array. The inability to do this was a
- * bug that was fixed in Avro 1.7.0.
- *
- * @return true if SpecificData can be properly read using a
- * ReflectDatumReader
- */
- static boolean canDecodeSpecificSchemaWithReflectDatumReader() {
- ReflectDatumReader<Record> datumReader = new ReflectDatumReader(Record.SCHEMA$);
- ReflectDatumWriter<Record> datumWriter = new ReflectDatumWriter(Record.SCHEMA$);
-
- Record record = new Record();
- record.subrecords = Lists.<CharSequence> newArrayList("a", "b");
-
- ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
- BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(byteArrayOutputStream, null);
-
- try {
- datumWriter.write(record, encoder);
- encoder.flush();
- BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(
- byteArrayOutputStream.toByteArray(), null);
- datumReader.read(record, decoder);
- } catch (IOException ioe) {
- throw new RuntimeException("Error performing specific schema test", ioe);
- } catch (ClassCastException cce) {
- // This indicates that we're using a pre-1.7.0 version of Avro, as the
- // ReflectDatumReader in those versions could not correctly handle an
- // array in a SpecificData value
- return false;
- }
- return true;
- }
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java b/crunch/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java
deleted file mode 100644
index 0fe9288..0000000
--- a/crunch/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java
+++ /dev/null
@@ -1,209 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.types.avro;
-
-import java.io.ByteArrayOutputStream;
-import java.io.Serializable;
-
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericData.Record;
-import org.apache.avro.generic.GenericDatumReader;
-import org.apache.avro.generic.GenericDatumWriter;
-import org.apache.avro.io.BinaryDecoder;
-import org.apache.avro.io.BinaryEncoder;
-import org.apache.avro.io.DatumReader;
-import org.apache.avro.io.DatumWriter;
-import org.apache.avro.io.DecoderFactory;
-import org.apache.avro.io.EncoderFactory;
-import org.apache.avro.specific.SpecificDatumReader;
-import org.apache.avro.specific.SpecificDatumWriter;
-import org.apache.crunch.CrunchRuntimeException;
-import org.apache.crunch.types.DeepCopier;
-import org.apache.hadoop.conf.Configuration;
-
-/**
- * Performs deep copies of Avro-serializable objects.
- * <p>
- * <b>Warning:</b> Methods in this class are not thread-safe. This shouldn't be a problem when
- * running in a map-reduce context where each mapper/reducer is running in its own JVM, but it may
- * well be a problem in any other kind of multi-threaded context.
- */
-abstract class AvroDeepCopier<T> implements DeepCopier<T>, Serializable {
-
- private String jsonSchema;
- private transient Configuration conf;
- private transient Schema schema;
- private BinaryEncoder binaryEncoder;
- private BinaryDecoder binaryDecoder;
-
- private transient DatumWriter<T> datumWriter;
- private transient DatumReader<T> datumReader;
-
- public AvroDeepCopier(Schema schema) {
- this.jsonSchema = schema.toString();
- }
-
- protected Schema getSchema() {
- if (schema == null) {
- schema = new Schema.Parser().parse(jsonSchema);
- }
- return schema;
- }
-
- @Override
- public void initialize(Configuration conf) {
- this.conf = conf;
- }
-
- protected abstract T createCopyTarget();
-
- protected abstract DatumWriter<T> createDatumWriter(Configuration conf);
-
- protected abstract DatumReader<T> createDatumReader(Configuration conf);
-
- /**
- * Deep copier for Avro specific data objects.
- */
- public static class AvroSpecificDeepCopier<T> extends AvroDeepCopier<T> {
-
- private Class<T> valueClass;
-
- public AvroSpecificDeepCopier(Class<T> valueClass, Schema schema) {
- super(schema);
- this.valueClass = valueClass;
- }
-
- @Override
- protected T createCopyTarget() {
- return createNewInstance(valueClass);
- }
-
- @Override
- protected DatumWriter<T> createDatumWriter(Configuration conf) {
- return new SpecificDatumWriter<T>(getSchema());
- }
-
- @Override
- protected DatumReader<T> createDatumReader(Configuration conf) {
- return new SpecificDatumReader<T>(getSchema());
- }
-
- }
-
- /**
- * Deep copier for Avro generic data objects.
- */
- public static class AvroGenericDeepCopier extends AvroDeepCopier<Record> {
-
- private transient Schema schema;
-
- public AvroGenericDeepCopier(Schema schema) {
- super(schema);
- }
-
- @Override
- protected Record createCopyTarget() {
- return new GenericData.Record(getSchema());
- }
-
- @Override
- protected DatumReader<Record> createDatumReader(Configuration conf) {
- return new GenericDatumReader<Record>(getSchema());
- }
-
- @Override
- protected DatumWriter<Record> createDatumWriter(Configuration conf) {
- return new GenericDatumWriter<Record>(getSchema());
- }
- }
-
- /**
- * Deep copier for Avro reflect data objects.
- */
- public static class AvroReflectDeepCopier<T> extends AvroDeepCopier<T> {
-
- private Class<T> valueClass;
-
- public AvroReflectDeepCopier(Class<T> valueClass, Schema schema) {
- super(schema);
- this.valueClass = valueClass;
- }
-
- @Override
- protected T createCopyTarget() {
- return createNewInstance(valueClass);
- }
-
- @Override
- protected DatumReader<T> createDatumReader(Configuration conf) {
- return Avros.getReflectDataFactory(conf).getReader(getSchema());
- }
-
- @Override
- protected DatumWriter<T> createDatumWriter(Configuration conf) {
- return Avros.getReflectDataFactory(conf).getWriter(getSchema());
- }
- }
-
- /**
- * Create a deep copy of an Avro value.
- *
- * @param source The value to be copied
- * @return The deep copy of the value
- */
- @Override
- public T deepCopy(T source) {
-
- if (source == null) {
- return null;
- }
-
- if (datumReader == null) {
- datumReader = createDatumReader(conf);
- }
- if (datumWriter == null) {
- datumWriter = createDatumWriter(conf);
- }
- ByteArrayOutputStream byteOutStream = new ByteArrayOutputStream();
- binaryEncoder = EncoderFactory.get().binaryEncoder(byteOutStream, binaryEncoder);
- T target = createCopyTarget();
- try {
- datumWriter.write(source, binaryEncoder);
- binaryEncoder.flush();
- binaryDecoder = DecoderFactory.get()
- .binaryDecoder(byteOutStream.toByteArray(), binaryDecoder);
- datumReader.read(target, binaryDecoder);
- } catch (Exception e) {
- throw new CrunchRuntimeException("Error while deep copying avro value " + source, e);
- }
-
- return target;
- }
-
- protected T createNewInstance(Class<T> targetClass) {
- try {
- return targetClass.newInstance();
- } catch (InstantiationException e) {
- throw new CrunchRuntimeException(e);
- } catch (IllegalAccessException e) {
- throw new CrunchRuntimeException(e);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/types/avro/AvroGroupedTableType.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/avro/AvroGroupedTableType.java b/crunch/src/main/java/org/apache/crunch/types/avro/AvroGroupedTableType.java
deleted file mode 100644
index 598868f..0000000
--- a/crunch/src/main/java/org/apache/crunch/types/avro/AvroGroupedTableType.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.types.avro;
-
-import java.util.Collection;
-
-import org.apache.avro.mapred.AvroJob;
-import org.apache.avro.mapred.AvroKey;
-import org.apache.avro.mapred.AvroKeyComparator;
-import org.apache.avro.mapred.AvroValue;
-import org.apache.crunch.GroupingOptions;
-import org.apache.crunch.MapFn;
-import org.apache.crunch.Pair;
-import org.apache.crunch.fn.PairMapFn;
-import org.apache.crunch.lib.PTables;
-import org.apache.crunch.types.Converter;
-import org.apache.crunch.types.PGroupedTableType;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.Job;
-
-/**
- *
- *
- */
-class AvroGroupedTableType<K, V> extends PGroupedTableType<K, V> {
-
- private static final AvroPairConverter CONVERTER = new AvroPairConverter();
- private final MapFn inputFn;
- private final MapFn outputFn;
-
- public AvroGroupedTableType(AvroTableType<K, V> tableType) {
- super(tableType);
- AvroType keyType = (AvroType) tableType.getKeyType();
- AvroType valueType = (AvroType) tableType.getValueType();
- this.inputFn = new PairIterableMapFn(keyType.getInputMapFn(), valueType.getInputMapFn());
- this.outputFn = new PairMapFn(keyType.getOutputMapFn(), valueType.getOutputMapFn());
- }
-
- @Override
- public Class<Pair<K, Iterable<V>>> getTypeClass() {
- return (Class<Pair<K, Iterable<V>>>) Pair.of(null, null).getClass();
- }
-
- @Override
- public Converter getGroupingConverter() {
- return CONVERTER;
- }
-
- @Override
- public MapFn getInputMapFn() {
- return inputFn;
- }
-
- @Override
- public MapFn getOutputMapFn() {
- return outputFn;
- }
-
- @Override
- public void initialize(Configuration conf) {
- getTableType().initialize(conf);
- }
-
- @Override
- public Pair<K, Iterable<V>> getDetachedValue(Pair<K, Iterable<V>> value) {
- return PTables.getGroupedDetachedValue(this, value);
- }
-
- @Override
- public void configureShuffle(Job job, GroupingOptions options) {
- AvroTableType<K, V> att = (AvroTableType<K, V>) tableType;
- String schemaJson = att.getSchema().toString();
- Configuration conf = job.getConfiguration();
-
- if (att.hasReflect()) {
- if (att.hasSpecific()) {
- Avros.checkCombiningSpecificAndReflectionSchemas();
- }
- conf.setBoolean(AvroJob.MAP_OUTPUT_IS_REFLECT, true);
- }
- conf.set(AvroJob.MAP_OUTPUT_SCHEMA, schemaJson);
- job.setSortComparatorClass(AvroKeyComparator.class);
- job.setMapOutputKeyClass(AvroKey.class);
- job.setMapOutputValueClass(AvroValue.class);
- if (options != null) {
- options.configure(job);
- }
-
- Avros.configureReflectDataFactory(conf);
-
- Collection<String> serializations = job.getConfiguration().getStringCollection(
- "io.serializations");
- if (!serializations.contains(SafeAvroSerialization.class.getName())) {
- serializations.add(SafeAvroSerialization.class.getName());
- job.getConfiguration().setStrings("io.serializations", serializations.toArray(new String[0]));
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/types/avro/AvroInputFormat.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/avro/AvroInputFormat.java b/crunch/src/main/java/org/apache/crunch/types/avro/AvroInputFormat.java
deleted file mode 100644
index b8bbebd..0000000
--- a/crunch/src/main/java/org/apache/crunch/types/avro/AvroInputFormat.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.types.avro;
-
-import java.io.IOException;
-
-import org.apache.avro.Schema;
-import org.apache.avro.mapred.AvroJob;
-import org.apache.avro.mapred.AvroWrapper;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-
-/** An {@link org.apache.hadoop.mapreduce.InputFormat} for Avro data files. */
-public class AvroInputFormat<T> extends FileInputFormat<AvroWrapper<T>, NullWritable> {
- @Override
- public RecordReader<AvroWrapper<T>, NullWritable> createRecordReader(InputSplit split, TaskAttemptContext context)
- throws IOException, InterruptedException {
- context.setStatus(split.toString());
- String jsonSchema = context.getConfiguration().get(AvroJob.INPUT_SCHEMA);
- Schema schema = new Schema.Parser().parse(jsonSchema);
- return new AvroRecordReader<T>(schema);
- }
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/types/avro/AvroKeyConverter.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/avro/AvroKeyConverter.java b/crunch/src/main/java/org/apache/crunch/types/avro/AvroKeyConverter.java
deleted file mode 100644
index 68b717d..0000000
--- a/crunch/src/main/java/org/apache/crunch/types/avro/AvroKeyConverter.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.types.avro;
-
-import org.apache.avro.mapred.AvroWrapper;
-import org.apache.crunch.types.Converter;
-import org.apache.hadoop.io.NullWritable;
-
-class AvroKeyConverter<K> implements Converter<AvroWrapper<K>, NullWritable, K, Iterable<K>> {
-
- private transient AvroWrapper<K> wrapper = null;
-
- @Override
- public K convertInput(AvroWrapper<K> key, NullWritable value) {
- return key.datum();
- }
-
- @Override
- public AvroWrapper<K> outputKey(K value) {
- getWrapper().datum(value);
- return wrapper;
- }
-
- @Override
- public NullWritable outputValue(K value) {
- return NullWritable.get();
- }
-
- @Override
- public Class<AvroWrapper<K>> getKeyClass() {
- return (Class<AvroWrapper<K>>) getWrapper().getClass();
- }
-
- @Override
- public Class<NullWritable> getValueClass() {
- return NullWritable.class;
- }
-
- private AvroWrapper<K> getWrapper() {
- if (wrapper == null) {
- wrapper = new AvroWrapper<K>();
- }
- return wrapper;
- }
-
- @Override
- public Iterable<K> convertIterableInput(AvroWrapper<K> key, Iterable<NullWritable> value) {
- throw new UnsupportedOperationException("Should not be possible");
- }
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/types/avro/AvroOutputFormat.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/avro/AvroOutputFormat.java b/crunch/src/main/java/org/apache/crunch/types/avro/AvroOutputFormat.java
deleted file mode 100644
index 98d3f50..0000000
--- a/crunch/src/main/java/org/apache/crunch/types/avro/AvroOutputFormat.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.types.avro;
-
-import java.io.IOException;
-
-import org.apache.avro.Schema;
-import org.apache.avro.file.CodecFactory;
-import org.apache.avro.file.DataFileWriter;
-import org.apache.avro.mapred.AvroJob;
-import org.apache.avro.mapred.AvroWrapper;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-
-/** An {@link org.apache.hadoop.mapreduce.OutputFormat} for Avro data files. */
-public class AvroOutputFormat<T> extends FileOutputFormat<AvroWrapper<T>, NullWritable> {
-
- @Override
- public RecordWriter<AvroWrapper<T>, NullWritable> getRecordWriter(TaskAttemptContext context) throws IOException,
- InterruptedException {
-
- Configuration conf = context.getConfiguration();
- Schema schema = null;
- String outputName = conf.get("crunch.namedoutput");
- if (outputName != null && !outputName.isEmpty()) {
- schema = (new Schema.Parser()).parse(conf.get("avro.output.schema." + outputName));
- } else {
- schema = AvroJob.getOutputSchema(context.getConfiguration());
- }
-
- ReflectDataFactory factory = Avros.getReflectDataFactory(conf);
- final DataFileWriter<T> WRITER = new DataFileWriter<T>(factory.<T> getWriter(schema));
-
- JobConf jc = new JobConf(conf);
- /* copied from org.apache.avro.mapred.AvroOutputFormat */
-
- if (org.apache.hadoop.mapred.FileOutputFormat.getCompressOutput(jc)) {
- int level = conf.getInt(org.apache.avro.mapred.AvroOutputFormat.DEFLATE_LEVEL_KEY,
- org.apache.avro.mapred.AvroOutputFormat.DEFAULT_DEFLATE_LEVEL);
- String codecName = conf.get(AvroJob.OUTPUT_CODEC,
- org.apache.avro.file.DataFileConstants.DEFLATE_CODEC);
- CodecFactory codec = codecName.equals(org.apache.avro.file.DataFileConstants.DEFLATE_CODEC)
- ? CodecFactory.deflateCodec(level)
- : CodecFactory.fromString(codecName);
- WRITER.setCodec(codec);
- }
-
- WRITER.setSyncInterval(jc.getInt(org.apache.avro.mapred.AvroOutputFormat.SYNC_INTERVAL_KEY,
- org.apache.avro.file.DataFileConstants.DEFAULT_SYNC_INTERVAL));
-
- Path path = getDefaultWorkFile(context, org.apache.avro.mapred.AvroOutputFormat.EXT);
- WRITER.create(schema, path.getFileSystem(context.getConfiguration()).create(path));
-
- return new RecordWriter<AvroWrapper<T>, NullWritable>() {
- @Override
- public void write(AvroWrapper<T> wrapper, NullWritable ignore) throws IOException {
- WRITER.append(wrapper.datum());
- }
-
- @Override
- public void close(TaskAttemptContext context) throws IOException, InterruptedException {
- WRITER.close();
- }
- };
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/types/avro/AvroPairConverter.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/avro/AvroPairConverter.java b/crunch/src/main/java/org/apache/crunch/types/avro/AvroPairConverter.java
deleted file mode 100644
index d1d2627..0000000
--- a/crunch/src/main/java/org/apache/crunch/types/avro/AvroPairConverter.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.types.avro;
-
-import java.util.Iterator;
-
-import org.apache.avro.mapred.AvroKey;
-import org.apache.avro.mapred.AvroValue;
-import org.apache.crunch.Pair;
-import org.apache.crunch.types.Converter;
-
-class AvroPairConverter<K, V> implements Converter<AvroKey<K>, AvroValue<V>, Pair<K, V>, Pair<K, Iterable<V>>> {
-
- private transient AvroKey<K> keyWrapper = null;
- private transient AvroValue<V> valueWrapper = null;
-
- @Override
- public Pair<K, V> convertInput(AvroKey<K> key, AvroValue<V> value) {
- return Pair.of(key.datum(), value.datum());
- }
-
- public Pair<K, Iterable<V>> convertIterableInput(AvroKey<K> key, Iterable<AvroValue<V>> iter) {
- Iterable<V> it = new AvroWrappedIterable<V>(iter);
- return Pair.of(key.datum(), it);
- }
-
- @Override
- public AvroKey<K> outputKey(Pair<K, V> value) {
- getKeyWrapper().datum(value.first());
- return keyWrapper;
- }
-
- @Override
- public AvroValue<V> outputValue(Pair<K, V> value) {
- getValueWrapper().datum(value.second());
- return valueWrapper;
- }
-
- @Override
- public Class<AvroKey<K>> getKeyClass() {
- return (Class<AvroKey<K>>) getKeyWrapper().getClass();
- }
-
- @Override
- public Class<AvroValue<V>> getValueClass() {
- return (Class<AvroValue<V>>) getValueWrapper().getClass();
- }
-
- private AvroKey<K> getKeyWrapper() {
- if (keyWrapper == null) {
- keyWrapper = new AvroKey<K>();
- }
- return keyWrapper;
- }
-
- private AvroValue<V> getValueWrapper() {
- if (valueWrapper == null) {
- valueWrapper = new AvroValue<V>();
- }
- return valueWrapper;
- }
-
- private static class AvroWrappedIterable<V> implements Iterable<V> {
-
- private final Iterable<AvroValue<V>> iters;
-
- public AvroWrappedIterable(Iterable<AvroValue<V>> iters) {
- this.iters = iters;
- }
-
- @Override
- public Iterator<V> iterator() {
- return new Iterator<V>() {
- private final Iterator<AvroValue<V>> it = iters.iterator();
-
- @Override
- public boolean hasNext() {
- return it.hasNext();
- }
-
- @Override
- public V next() {
- return it.next().datum();
- }
-
- @Override
- public void remove() {
- it.remove();
- }
- };
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/types/avro/AvroRecordReader.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/avro/AvroRecordReader.java b/crunch/src/main/java/org/apache/crunch/types/avro/AvroRecordReader.java
deleted file mode 100644
index 9c7578c..0000000
--- a/crunch/src/main/java/org/apache/crunch/types/avro/AvroRecordReader.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.types.avro;
-
-import java.io.IOException;
-
-import org.apache.avro.Schema;
-import org.apache.avro.file.DataFileReader;
-import org.apache.avro.file.FileReader;
-import org.apache.avro.file.SeekableInput;
-import org.apache.avro.io.DatumReader;
-import org.apache.avro.mapred.AvroJob;
-import org.apache.avro.mapred.AvroWrapper;
-import org.apache.avro.mapred.FsInput;
-import org.apache.avro.specific.SpecificDatumReader;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.input.FileSplit;
-
-/** An {@link RecordReader} for Avro data files. */
-class AvroRecordReader<T> extends RecordReader<AvroWrapper<T>, NullWritable> {
-
- private FileReader<T> reader;
- private long start;
- private long end;
- private AvroWrapper<T> key;
- private NullWritable value;
- private Schema schema;
-
- public AvroRecordReader(Schema schema) {
- this.schema = schema;
- }
-
- @Override
- public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException, InterruptedException {
- FileSplit split = (FileSplit) genericSplit;
- Configuration conf = context.getConfiguration();
- SeekableInput in = new FsInput(split.getPath(), conf);
- DatumReader<T> datumReader = null;
- if (context.getConfiguration().getBoolean(AvroJob.INPUT_IS_REFLECT, true)) {
- ReflectDataFactory factory = Avros.getReflectDataFactory(conf);
- datumReader = factory.getReader(schema);
- } else {
- datumReader = new SpecificDatumReader<T>(schema);
- }
- this.reader = DataFileReader.openReader(in, datumReader);
- reader.sync(split.getStart()); // sync to start
- this.start = reader.tell();
- this.end = split.getStart() + split.getLength();
- }
-
- @Override
- public boolean nextKeyValue() throws IOException, InterruptedException {
- if (!reader.hasNext() || reader.pastSync(end)) {
- key = null;
- value = null;
- return false;
- }
- if (key == null) {
- key = new AvroWrapper<T>();
- }
- if (value == null) {
- value = NullWritable.get();
- }
- key.datum(reader.next(key.datum()));
- return true;
- }
-
- @Override
- public AvroWrapper<T> getCurrentKey() throws IOException, InterruptedException {
- return key;
- }
-
- @Override
- public NullWritable getCurrentValue() throws IOException, InterruptedException {
- return value;
- }
-
- @Override
- public float getProgress() throws IOException {
- if (end == start) {
- return 0.0f;
- } else {
- return Math.min(1.0f, (getPos() - start) / (float) (end - start));
- }
- }
-
- public long getPos() throws IOException {
- return reader.tell();
- }
-
- @Override
- public void close() throws IOException {
- reader.close();
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/types/avro/AvroTableType.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/avro/AvroTableType.java b/crunch/src/main/java/org/apache/crunch/types/avro/AvroTableType.java
deleted file mode 100644
index 86613df..0000000
--- a/crunch/src/main/java/org/apache/crunch/types/avro/AvroTableType.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.types.avro;
-
-import org.apache.avro.Schema;
-import org.apache.avro.generic.IndexedRecord;
-import org.apache.crunch.MapFn;
-import org.apache.crunch.Pair;
-import org.apache.crunch.lib.PTables;
-import org.apache.crunch.types.PGroupedTableType;
-import org.apache.crunch.types.PTableType;
-import org.apache.crunch.types.PType;
-import org.apache.crunch.types.TupleDeepCopier;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.TaskInputOutputContext;
-
-/**
- * The implementation of the PTableType interface for Avro-based serialization.
- *
- */
-class AvroTableType<K, V> extends AvroType<Pair<K, V>> implements PTableType<K, V> {
-
- private static class PairToAvroPair extends MapFn<Pair, org.apache.avro.mapred.Pair> {
- private final MapFn keyMapFn;
- private final MapFn valueMapFn;
- private final String firstJson;
- private final String secondJson;
-
- private String pairSchemaJson;
- private transient Schema pairSchema;
-
- public PairToAvroPair(AvroType keyType, AvroType valueType) {
- this.keyMapFn = keyType.getOutputMapFn();
- this.firstJson = keyType.getSchema().toString();
- this.valueMapFn = valueType.getOutputMapFn();
- this.secondJson = valueType.getSchema().toString();
- }
-
- @Override
- public void configure(Configuration conf) {
- keyMapFn.configure(conf);
- valueMapFn.configure(conf);
- }
-
- @Override
- public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) {
- keyMapFn.setContext(context);
- valueMapFn.setContext(context);
- }
-
- @Override
- public void initialize() {
- keyMapFn.initialize();
- valueMapFn.initialize();
- pairSchemaJson = org.apache.avro.mapred.Pair.getPairSchema(
- new Schema.Parser().parse(firstJson), new Schema.Parser().parse(secondJson)).toString();
- }
-
- @Override
- public org.apache.avro.mapred.Pair map(Pair input) {
- if (pairSchema == null) {
- pairSchema = new Schema.Parser().parse(pairSchemaJson);
- }
- org.apache.avro.mapred.Pair avroPair = new org.apache.avro.mapred.Pair(pairSchema);
- avroPair.key(keyMapFn.map(input.first()));
- avroPair.value(valueMapFn.map(input.second()));
- return avroPair;
- }
- }
-
- private static class IndexedRecordToPair extends MapFn<IndexedRecord, Pair> {
-
- private final MapFn firstMapFn;
- private final MapFn secondMapFn;
-
- public IndexedRecordToPair(MapFn firstMapFn, MapFn secondMapFn) {
- this.firstMapFn = firstMapFn;
- this.secondMapFn = secondMapFn;
- }
-
- @Override
- public void configure(Configuration conf) {
- firstMapFn.configure(conf);
- secondMapFn.configure(conf);
- }
-
- @Override
- public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) {
- firstMapFn.setContext(context);
- secondMapFn.setContext(context);
- }
-
- @Override
- public void initialize() {
- firstMapFn.initialize();
- secondMapFn.initialize();
- }
-
- @Override
- public Pair map(IndexedRecord input) {
- return Pair.of(firstMapFn.map(input.get(0)), secondMapFn.map(input.get(1)));
- }
- }
-
- private final AvroType<K> keyType;
- private final AvroType<V> valueType;
-
- public AvroTableType(AvroType<K> keyType, AvroType<V> valueType, Class<Pair<K, V>> pairClass) {
- super(pairClass, org.apache.avro.mapred.Pair.getPairSchema(keyType.getSchema(),
- valueType.getSchema()), new IndexedRecordToPair(keyType.getInputMapFn(),
- valueType.getInputMapFn()), new PairToAvroPair(keyType, valueType), new TupleDeepCopier(
- Pair.class, keyType, valueType), keyType, valueType);
- this.keyType = keyType;
- this.valueType = valueType;
- }
-
- @Override
- public PType<K> getKeyType() {
- return keyType;
- }
-
- @Override
- public PType<V> getValueType() {
- return valueType;
- }
-
- @Override
- public PGroupedTableType<K, V> getGroupedTableType() {
- return new AvroGroupedTableType<K, V>(this);
- }
-
- @Override
- public Pair<K, V> getDetachedValue(Pair<K, V> value) {
- return PTables.getDetachedValue(this, value);
- }
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/types/avro/AvroTextOutputFormat.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/avro/AvroTextOutputFormat.java b/crunch/src/main/java/org/apache/crunch/types/avro/AvroTextOutputFormat.java
deleted file mode 100644
index 4930235..0000000
--- a/crunch/src/main/java/org/apache/crunch/types/avro/AvroTextOutputFormat.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.types.avro;
-
-import java.io.IOException;
-
-import org.apache.avro.mapred.AvroWrapper;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
-
-public class AvroTextOutputFormat<K, V> extends TextOutputFormat<K, V> {
- class DatumRecordTextWriter extends RecordWriter<K, V> {
- private RecordWriter lineRecordWriter;
-
- public DatumRecordTextWriter(RecordWriter recordWriter) {
- this.lineRecordWriter = recordWriter;
- }
-
- @Override
- public void close(TaskAttemptContext context) throws IOException, InterruptedException {
- lineRecordWriter.close(context);
- }
-
- @Override
- public void write(K arg0, V arg1) throws IOException, InterruptedException {
- lineRecordWriter.write(getData(arg0), getData(arg1));
- }
-
- private Object getData(Object o) {
- Object data = o;
- if (o instanceof AvroWrapper) {
- data = ((AvroWrapper) o).datum();
- }
- return data;
- }
- }
-
- @Override
- public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
- RecordWriter<K, V> recordWriter = super.getRecordWriter(context);
- return new DatumRecordTextWriter(recordWriter);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/types/avro/AvroType.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/avro/AvroType.java b/crunch/src/main/java/org/apache/crunch/types/avro/AvroType.java
deleted file mode 100644
index a92b0d0..0000000
--- a/crunch/src/main/java/org/apache/crunch/types/avro/AvroType.java
+++ /dev/null
@@ -1,199 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.types.avro;
-
-import java.util.List;
-
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.specific.SpecificRecord;
-import org.apache.commons.lang.builder.HashCodeBuilder;
-import org.apache.crunch.MapFn;
-import org.apache.crunch.fn.IdentityFn;
-import org.apache.crunch.io.ReadableSourceTarget;
-import org.apache.crunch.io.avro.AvroFileSourceTarget;
-import org.apache.crunch.types.Converter;
-import org.apache.crunch.types.DeepCopier;
-import org.apache.crunch.types.PType;
-import org.apache.crunch.types.PTypeFamily;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-
-/**
- * The implementation of the PType interface for Avro-based serialization.
- *
- */
-public class AvroType<T> implements PType<T> {
-
- private static final Converter AVRO_CONVERTER = new AvroKeyConverter();
-
- private final Class<T> typeClass;
- private final String schemaString;
- private transient Schema schema;
- private final MapFn baseInputMapFn;
- private final MapFn baseOutputMapFn;
- private final List<PType> subTypes;
- private DeepCopier<T> deepCopier;
- private boolean initialized = false;
-
- public AvroType(Class<T> typeClass, Schema schema, DeepCopier<T> deepCopier, PType... ptypes) {
- this(typeClass, schema, IdentityFn.getInstance(), IdentityFn.getInstance(), deepCopier, ptypes);
- }
-
- public AvroType(Class<T> typeClass, Schema schema, MapFn inputMapFn, MapFn outputMapFn,
- DeepCopier<T> deepCopier, PType... ptypes) {
- this.typeClass = typeClass;
- this.schema = Preconditions.checkNotNull(schema);
- this.schemaString = schema.toString();
- this.baseInputMapFn = inputMapFn;
- this.baseOutputMapFn = outputMapFn;
- this.deepCopier = deepCopier;
- this.subTypes = ImmutableList.<PType> builder().add(ptypes).build();
- }
-
- @Override
- public Class<T> getTypeClass() {
- return typeClass;
- }
-
- @Override
- public PTypeFamily getFamily() {
- return AvroTypeFamily.getInstance();
- }
-
- @Override
- public List<PType> getSubTypes() {
- return Lists.<PType> newArrayList(subTypes);
- }
-
- public Schema getSchema() {
- if (schema == null) {
- schema = new Schema.Parser().parse(schemaString);
- }
- return schema;
- }
-
- /**
- * Determine if the wrapped type is a specific data avro type or wraps one.
- *
- * @return true if the wrapped type is a specific data type or wraps one
- */
- public boolean hasSpecific() {
- if (Avros.isPrimitive(this)) {
- return false;
- }
-
- if (!this.subTypes.isEmpty()) {
- for (PType<?> subType : this.subTypes) {
- AvroType<?> atype = (AvroType<?>) subType;
- if (atype.hasSpecific()) {
- return true;
- }
- }
- return false;
- }
-
- return SpecificRecord.class.isAssignableFrom(typeClass);
- }
-
- /**
- * Determine if the wrapped type is a generic data avro type.
- *
- * @return true if the wrapped type is a generic type
- */
- public boolean isGeneric() {
- return GenericData.Record.class.equals(typeClass);
- }
-
- /**
- * Determine if the wrapped type is a reflection-based avro type or wraps one.
- *
- * @return true if the wrapped type is a reflection-based type or wraps one.
- */
- public boolean hasReflect() {
- if (Avros.isPrimitive(this)) {
- return false;
- }
-
- if (!this.subTypes.isEmpty()) {
- for (PType<?> subType : this.subTypes) {
- if (((AvroType<?>) subType).hasReflect()) {
- return true;
- }
- }
- return false;
- }
-
- return !(typeClass.equals(GenericData.Record.class) || SpecificRecord.class
- .isAssignableFrom(typeClass));
- }
-
- public MapFn<Object, T> getInputMapFn() {
- return baseInputMapFn;
- }
-
- public MapFn<T, Object> getOutputMapFn() {
- return baseOutputMapFn;
- }
-
- @Override
- public Converter getConverter() {
- return AVRO_CONVERTER;
- }
-
- @Override
- public ReadableSourceTarget<T> getDefaultFileSource(Path path) {
- return new AvroFileSourceTarget<T>(path, this);
- }
-
- @Override
- public void initialize(Configuration conf) {
- deepCopier.initialize(conf);
- initialized = true;
- }
-
- @Override
- public T getDetachedValue(T value) {
- if (!initialized) {
- throw new IllegalStateException("Cannot call getDetachedValue on an uninitialized PType");
- }
- return deepCopier.deepCopy(value);
- }
-
- @Override
- public boolean equals(Object other) {
- if (other == null || !(other instanceof AvroType)) {
- return false;
- }
- AvroType at = (AvroType) other;
- return (typeClass.equals(at.typeClass) && subTypes.equals(at.subTypes));
-
- }
-
- @Override
- public int hashCode() {
- HashCodeBuilder hcb = new HashCodeBuilder();
- hcb.append(typeClass).append(subTypes);
- return hcb.toHashCode();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/types/avro/AvroTypeFamily.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/avro/AvroTypeFamily.java b/crunch/src/main/java/org/apache/crunch/types/avro/AvroTypeFamily.java
deleted file mode 100644
index e09e173..0000000
--- a/crunch/src/main/java/org/apache/crunch/types/avro/AvroTypeFamily.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.types.avro;
-
-import java.nio.ByteBuffer;
-import java.util.Collection;
-import java.util.Map;
-
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericData;
-import org.apache.crunch.MapFn;
-import org.apache.crunch.Pair;
-import org.apache.crunch.Tuple;
-import org.apache.crunch.Tuple3;
-import org.apache.crunch.Tuple4;
-import org.apache.crunch.TupleN;
-import org.apache.crunch.types.PGroupedTableType;
-import org.apache.crunch.types.PTableType;
-import org.apache.crunch.types.PType;
-import org.apache.crunch.types.PTypeFamily;
-import org.apache.crunch.types.PTypeUtils;
-
-public class AvroTypeFamily implements PTypeFamily {
-
- private static final AvroTypeFamily INSTANCE = new AvroTypeFamily();
-
- public static AvroTypeFamily getInstance() {
- return INSTANCE;
- }
-
- // There can only be one instance.
- private AvroTypeFamily() {
- }
-
- @Override
- public PType<Void> nulls() {
- return Avros.nulls();
- }
-
- @Override
- public PType<String> strings() {
- return Avros.strings();
- }
-
- @Override
- public PType<Long> longs() {
- return Avros.longs();
- }
-
- @Override
- public PType<Integer> ints() {
- return Avros.ints();
- }
-
- @Override
- public PType<Float> floats() {
- return Avros.floats();
- }
-
- @Override
- public PType<Double> doubles() {
- return Avros.doubles();
- }
-
- @Override
- public PType<Boolean> booleans() {
- return Avros.booleans();
- }
-
- @Override
- public PType<ByteBuffer> bytes() {
- return Avros.bytes();
- }
-
- @Override
- public <T> PType<T> records(Class<T> clazz) {
- return Avros.records(clazz);
- }
-
- public PType<GenericData.Record> generics(Schema schema) {
- return Avros.generics(schema);
- }
-
- public <T> PType<T> containers(Class<T> clazz) {
- return Avros.containers(clazz);
- }
-
- @Override
- public <T> PType<Collection<T>> collections(PType<T> ptype) {
- return Avros.collections(ptype);
- }
-
- @Override
- public <T> PType<Map<String, T>> maps(PType<T> ptype) {
- return Avros.maps(ptype);
- }
-
- @Override
- public <V1, V2> PType<Pair<V1, V2>> pairs(PType<V1> p1, PType<V2> p2) {
- return Avros.pairs(p1, p2);
- }
-
- @Override
- public <V1, V2, V3> PType<Tuple3<V1, V2, V3>> triples(PType<V1> p1, PType<V2> p2, PType<V3> p3) {
- return Avros.triples(p1, p2, p3);
- }
-
- @Override
- public <V1, V2, V3, V4> PType<Tuple4<V1, V2, V3, V4>> quads(PType<V1> p1, PType<V2> p2, PType<V3> p3, PType<V4> p4) {
- return Avros.quads(p1, p2, p3, p4);
- }
-
- @Override
- public PType<TupleN> tuples(PType<?>... ptypes) {
- return Avros.tuples(ptypes);
- }
-
- @Override
- public <K, V> PTableType<K, V> tableOf(PType<K> key, PType<V> value) {
- return Avros.tableOf(key, value);
- }
-
- @Override
- public <T> PType<T> as(PType<T> ptype) {
- if (ptype instanceof AvroType || ptype instanceof AvroGroupedTableType) {
- return ptype;
- }
- if (ptype instanceof PGroupedTableType) {
- PTableType ptt = ((PGroupedTableType) ptype).getTableType();
- return new AvroGroupedTableType((AvroTableType) as(ptt));
- }
- Class<T> typeClass = ptype.getTypeClass();
- PType<T> prim = Avros.getPrimitiveType(typeClass);
- if (prim != null) {
- return prim;
- }
- return PTypeUtils.convert(ptype, this);
- }
-
- @Override
- public <T extends Tuple> PType<T> tuples(Class<T> clazz, PType<?>... ptypes) {
- return Avros.tuples(clazz, ptypes);
- }
-
- @Override
- public <S, T> PType<T> derived(Class<T> clazz, MapFn<S, T> inputFn, MapFn<T, S> outputFn, PType<S> base) {
- return Avros.derived(clazz, inputFn, outputFn, base);
- }
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/types/avro/AvroUtf8InputFormat.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/avro/AvroUtf8InputFormat.java b/crunch/src/main/java/org/apache/crunch/types/avro/AvroUtf8InputFormat.java
deleted file mode 100644
index 9460fa5..0000000
--- a/crunch/src/main/java/org/apache/crunch/types/avro/AvroUtf8InputFormat.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.types.avro;
-
-import java.io.IOException;
-
-import org.apache.avro.mapred.AvroWrapper;
-import org.apache.avro.util.Utf8;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.compress.CompressionCodecFactory;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
-
-/**
- * An {@link org.apache.hadoop.mapred.InputFormat} for text files. Each line is
- * a {@link Utf8} key; values are null.
- */
-public class AvroUtf8InputFormat extends FileInputFormat<AvroWrapper<Utf8>, NullWritable> {
-
- static class Utf8LineRecordReader extends RecordReader<AvroWrapper<Utf8>, NullWritable> {
-
- private LineRecordReader lineRecordReader;
-
- private AvroWrapper<Utf8> currentKey = new AvroWrapper<Utf8>();
-
- public Utf8LineRecordReader() throws IOException {
- this.lineRecordReader = new LineRecordReader();
- }
-
- public void close() throws IOException {
- lineRecordReader.close();
- }
-
- public float getProgress() throws IOException {
- return lineRecordReader.getProgress();
- }
-
- @Override
- public AvroWrapper<Utf8> getCurrentKey() throws IOException, InterruptedException {
- Text txt = lineRecordReader.getCurrentValue();
- currentKey.datum(new Utf8(txt.toString()));
- return currentKey;
- }
-
- @Override
- public NullWritable getCurrentValue() throws IOException, InterruptedException {
- return NullWritable.get();
- }
-
- @Override
- public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
- lineRecordReader.initialize(split, context);
- }
-
- @Override
- public boolean nextKeyValue() throws IOException, InterruptedException {
- return lineRecordReader.nextKeyValue();
- }
- }
-
- private CompressionCodecFactory compressionCodecs = null;
-
- public void configure(Configuration conf) {
- compressionCodecs = new CompressionCodecFactory(conf);
- }
-
- protected boolean isSplitable(FileSystem fs, Path file) {
- return compressionCodecs.getCodec(file) == null;
- }
-
- @Override
- public RecordReader<AvroWrapper<Utf8>, NullWritable> createRecordReader(InputSplit split, TaskAttemptContext context)
- throws IOException, InterruptedException {
- return new Utf8LineRecordReader();
- }
-}