You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2013/04/23 22:41:33 UTC
[31/43] CRUNCH-196: crunch -> crunch-core rename to fix build issues
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/io/CrunchOutputs.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/CrunchOutputs.java b/crunch-core/src/main/java/org/apache/crunch/io/CrunchOutputs.java
new file mode 100644
index 0000000..ccf4fb5
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/io/CrunchOutputs.java
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io;
+
+import org.apache.crunch.CrunchRuntimeException;
+import org.apache.crunch.hadoop.mapreduce.TaskAttemptContextFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * An analogue of {@link CrunchInputs} for handling multiple {@code OutputFormat} instances
+ * writing to multiple files within a single MapReduce job.
+ */
+public class CrunchOutputs<K, V> {
+ public static final String CRUNCH_OUTPUTS = "crunch.outputs.dir";
+
+ private static final char RECORD_SEP = ',';
+ private static final char FIELD_SEP = ';';
+ private static final Joiner JOINER = Joiner.on(FIELD_SEP);
+ private static final Splitter SPLITTER = Splitter.on(FIELD_SEP);
+
+ public static void addNamedOutput(Job job, String name,
+ Class<? extends OutputFormat> outputFormatClass,
+ Class keyClass, Class valueClass) {
+ addNamedOutput(job, name, FormatBundle.forOutput(outputFormatClass), keyClass, valueClass);
+ }
+
+ public static void addNamedOutput(Job job, String name,
+ FormatBundle<? extends OutputFormat> outputBundle,
+ Class keyClass, Class valueClass) {
+ Configuration conf = job.getConfiguration();
+ String inputs = JOINER.join(name, outputBundle.serialize(), keyClass.getName(), valueClass.getName());
+ String existing = conf.get(CRUNCH_OUTPUTS);
+ conf.set(CRUNCH_OUTPUTS, existing == null ? inputs : existing + RECORD_SEP + inputs);
+ }
+
+ private static class OutputConfig<K, V> {
+ public FormatBundle<OutputFormat<K, V>> bundle;
+ public Class<K> keyClass;
+ public Class<V> valueClass;
+
+ public OutputConfig(FormatBundle<OutputFormat<K, V>> bundle,
+ Class<K> keyClass, Class<V> valueClass) {
+ this.bundle = bundle;
+ this.keyClass = keyClass;
+ this.valueClass = valueClass;
+ }
+ }
+
+ private static Map<String, OutputConfig> getNamedOutputs(
+ TaskInputOutputContext<?, ?, ?, ?> context) {
+ Map<String, OutputConfig> out = Maps.newHashMap();
+ Configuration conf = context.getConfiguration();
+ for (String input : Splitter.on(RECORD_SEP).split(conf.get(CRUNCH_OUTPUTS))) {
+ List<String> fields = Lists.newArrayList(SPLITTER.split(input));
+ String name = fields.get(0);
+ FormatBundle<OutputFormat> bundle = FormatBundle.fromSerialized(fields.get(1),
+ OutputFormat.class);
+ try {
+ Class<?> keyClass = Class.forName(fields.get(2));
+ Class<?> valueClass = Class.forName(fields.get(3));
+ out.put(name, new OutputConfig(bundle, keyClass, valueClass));
+ } catch (ClassNotFoundException e) {
+ throw new CrunchRuntimeException(e);
+ }
+ }
+ return out;
+ }
+
+ private static final String BASE_OUTPUT_NAME = "mapreduce.output.basename";
+ private static final String COUNTERS_GROUP = CrunchOutputs.class.getName();
+
+ private TaskInputOutputContext<?, ?, K, V> baseContext;
+ private Map<String, OutputConfig> namedOutputs;
+ private Map<String, RecordWriter<K, V>> recordWriters;
+ private Map<String, TaskAttemptContext> taskContextCache;
+
+ /**
+ * Creates and initializes multiple outputs support,
+ * it should be instantiated in the Mapper/Reducer setup method.
+ *
+ * @param context the TaskInputOutputContext object
+ */
+ public CrunchOutputs(TaskInputOutputContext<?, ?, K, V> context) {
+ this.baseContext = context;
+ namedOutputs = getNamedOutputs(context);
+ recordWriters = Maps.newHashMap();
+ taskContextCache = Maps.newHashMap();
+ }
+
+ @SuppressWarnings("unchecked")
+ public void write(String namedOutput, K key, V value)
+ throws IOException, InterruptedException {
+ if (!namedOutputs.containsKey(namedOutput)) {
+ throw new IllegalArgumentException("Undefined named output '" +
+ namedOutput + "'");
+ }
+ TaskAttemptContext taskContext = getContext(namedOutput);
+ baseContext.getCounter(COUNTERS_GROUP, namedOutput).increment(1);
+ getRecordWriter(taskContext, namedOutput).write(key, value);
+ }
+
+ public void close() throws IOException, InterruptedException {
+ for (RecordWriter<?, ?> writer : recordWriters.values()) {
+ writer.close(baseContext);
+ }
+ }
+
+ private TaskAttemptContext getContext(String nameOutput) throws IOException {
+ TaskAttemptContext taskContext = taskContextCache.get(nameOutput);
+ if (taskContext != null) {
+ return taskContext;
+ }
+
+ // The following trick leverages the instantiation of a record writer via
+ // the job thus supporting arbitrary output formats.
+ OutputConfig outConfig = namedOutputs.get(nameOutput);
+ Configuration conf = new Configuration(baseContext.getConfiguration());
+ Job job = new Job(conf);
+ job.getConfiguration().set("crunch.namedoutput", nameOutput);
+ job.setOutputFormatClass(outConfig.bundle.getFormatClass());
+ job.setOutputKeyClass(outConfig.keyClass);
+ job.setOutputValueClass(outConfig.valueClass);
+ outConfig.bundle.configure(job.getConfiguration());
+ taskContext = TaskAttemptContextFactory.create(
+ job.getConfiguration(), baseContext.getTaskAttemptID());
+
+ taskContextCache.put(nameOutput, taskContext);
+ return taskContext;
+ }
+
+ private synchronized RecordWriter<K, V> getRecordWriter(
+ TaskAttemptContext taskContext, String namedOutput)
+ throws IOException, InterruptedException {
+ // look for record-writer in the cache
+ RecordWriter<K, V> writer = recordWriters.get(namedOutput);
+
+ // If not in cache, create a new one
+ if (writer == null) {
+ // get the record writer from context output format
+ taskContext.getConfiguration().set(BASE_OUTPUT_NAME, namedOutput);
+ try {
+ OutputFormat format = ReflectionUtils.newInstance(
+ taskContext.getOutputFormatClass(),
+ taskContext.getConfiguration());
+ writer = format.getRecordWriter(taskContext);
+ } catch (ClassNotFoundException e) {
+ throw new IOException(e);
+ }
+ recordWriters.put(namedOutput, writer);
+ }
+
+ return writer;
+ }
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/io/FileNamingScheme.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/FileNamingScheme.java b/crunch-core/src/main/java/org/apache/crunch/io/FileNamingScheme.java
new file mode 100644
index 0000000..cf93651
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/io/FileNamingScheme.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Encapsulates rules for naming output files. It is the responsibility of
+ * implementors to avoid file name collisions.
+ */
+public interface FileNamingScheme {
+
+ /**
+ * Get the output file name for a map task. Note that the implementation is
+ * responsible for avoiding naming collisions.
+ *
+ * @param configuration The configuration of the job for which the map output
+ * is being written
+ * @param outputDirectory The directory where the output will be written
+ * @return The filename for the output of the map task
+ * @throws IOException if an exception occurs while accessing the output file
+ * system
+ */
+ String getMapOutputName(Configuration configuration, Path outputDirectory) throws IOException;
+
+ /**
+ * Get the output file name for a reduce task. Note that the implementation is
+ * responsible for avoiding naming collisions.
+ *
+ * @param configuration The configuration of the job for which output is being
+ * written
+ * @param outputDirectory The directory where the file will be written
+ * @param partitionId The partition of the reduce task being output
+ * @return The filename for the output of the reduce task
+ * @throws IOException if an exception occurs while accessing output file
+ * system
+ */
+ String getReduceOutputName(Configuration configuration, Path outputDirectory, int partitionId) throws IOException;
+
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/io/FileReaderFactory.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/FileReaderFactory.java b/crunch-core/src/main/java/org/apache/crunch/io/FileReaderFactory.java
new file mode 100644
index 0000000..5cccb7b
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/io/FileReaderFactory.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io;
+
+import java.util.Iterator;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+public interface FileReaderFactory<T> {
+ Iterator<T> read(FileSystem fs, Path path);
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/io/FormatBundle.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/FormatBundle.java b/crunch-core/src/main/java/org/apache/crunch/io/FormatBundle.java
new file mode 100644
index 0000000..d969009
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/io/FormatBundle.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.util.Map;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.lang.builder.HashCodeBuilder;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.OutputFormat;
+
+import com.google.common.collect.Maps;
+
+/**
+ * A combination of an {@link InputFormat} or {@link OutputFormat} and any extra
+ * configuration information that format class needs to run.
+ *
+ * <p>The {@code FormatBundle} allow us to let different formats act as
+ * if they are the only format that exists in a particular MapReduce job, even
+ * when we have multiple types of inputs and outputs within a single job.
+ */
+public class FormatBundle<K> implements Serializable {
+
+ private Class<K> formatClass;
+ private Map<String, String> extraConf;
+
+ public static <T> FormatBundle<T> fromSerialized(String serialized, Class<T> clazz) {
+ ByteArrayInputStream bais = new ByteArrayInputStream(Base64.decodeBase64(serialized));
+ try {
+ ObjectInputStream ois = new ObjectInputStream(bais);
+ FormatBundle<T> bundle = (FormatBundle<T>) ois.readObject();
+ ois.close();
+ return bundle;
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ } catch (ClassNotFoundException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public static <T extends InputFormat<?, ?>> FormatBundle<T> forInput(Class<T> inputFormatClass) {
+ return new FormatBundle<T>(inputFormatClass);
+ }
+
+ public static <T extends OutputFormat<?, ?>> FormatBundle<T> forOutput(Class<T> inputFormatClass) {
+ return new FormatBundle<T>(inputFormatClass);
+ }
+
+ private FormatBundle(Class<K> formatClass) {
+ this.formatClass = formatClass;
+ this.extraConf = Maps.newHashMap();
+ }
+
+ public FormatBundle<K> set(String key, String value) {
+ this.extraConf.put(key, value);
+ return this;
+ }
+
+ public Class<K> getFormatClass() {
+ return formatClass;
+ }
+
+ public Configuration configure(Configuration conf) {
+ for (Map.Entry<String, String> e : extraConf.entrySet()) {
+ conf.set(e.getKey(), e.getValue());
+ }
+ return conf;
+ }
+
+ public String serialize() {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ try {
+ ObjectOutputStream oos = new ObjectOutputStream(baos);
+ oos.writeObject(this);
+ oos.close();
+ return Base64.encodeBase64String(baos.toByteArray());
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public String getName() {
+ return formatClass.getSimpleName();
+ }
+
+ @Override
+ public int hashCode() {
+ return new HashCodeBuilder().append(formatClass).append(extraConf).toHashCode();
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (other == null || !(other instanceof FormatBundle)) {
+ return false;
+ }
+ FormatBundle<K> oib = (FormatBundle<K>) other;
+ return formatClass.equals(oib.formatClass) && extraConf.equals(oib.extraConf);
+ }
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/io/From.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/From.java b/crunch-core/src/main/java/org/apache/crunch/io/From.java
new file mode 100644
index 0000000..e4cfb6a
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/io/From.java
@@ -0,0 +1,324 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io;
+
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.crunch.Source;
+import org.apache.crunch.TableSource;
+import org.apache.crunch.io.avro.AvroFileSource;
+import org.apache.crunch.io.impl.FileTableSourceImpl;
+import org.apache.crunch.io.seq.SeqFileSource;
+import org.apache.crunch.io.seq.SeqFileTableSource;
+import org.apache.crunch.io.text.TextFileSource;
+import org.apache.crunch.types.PTableType;
+import org.apache.crunch.types.PType;
+import org.apache.crunch.types.PTypeFamily;
+import org.apache.crunch.types.avro.AvroType;
+import org.apache.crunch.types.avro.Avros;
+import org.apache.crunch.types.writable.Writables;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+
+/**
+ * <p>Static factory methods for creating common {@link Source} types.</p>
+ *
+ * <p>The {@code From} class is intended to provide a literate API for creating
+ * Crunch pipelines from common input file types.
+ *
+ * <code>
+ * Pipeline pipeline = new MRPipeline(this.getClass());
+ *
+ * // Reference the lines of a text file by wrapping the TextInputFormat class.
+ * PCollection<String> lines = pipeline.read(From.textFile("/path/to/myfiles"));
+ *
+ * // Reference entries from a sequence file where the key is a LongWritable and the
+ * // value is a custom Writable class.
+ * PTable<LongWritable, MyWritable> table = pipeline.read(From.sequenceFile(
+ * "/path/to/seqfiles", LongWritable.class, MyWritable.class));
+ *
+ * // Reference the records from an Avro file, where MyAvroObject implements Avro's
+ * // SpecificRecord interface.
+ * PCollection<MyAvroObject> myObjects = pipeline.read(From.avroFile("/path/to/avrofiles",
+ * MyAvroObject.class));
+ *
+ * // References the key-value pairs from a custom extension of FileInputFormat:
+ * PTable<KeyWritable, ValueWritable> custom = pipeline.read(From.formattedFile(
+ * "/custom", MyFileInputFormat.class, KeyWritable.class, ValueWritable.class));
+ * </code>
+ * </p>
+ */
+public class From {
+
+ /**
+ * Creates a {@code TableSource<K, V>} for reading data from files that have custom
+ * {@code FileInputFormat<K, V>} implementations not covered by the provided {@code TableSource}
+ * and {@code Source} factory methods.
+ *
+ * @param pathName The name of the path to the data on the filesystem
+ * @param formatClass The {@code FileInputFormat} implementation
+ * @param keyClass The {@code Writable} to use for the key
+ * @param valueClass The {@code Writable} to use for the value
+ * @return A new {@code TableSource<K, V>} instance
+ */
+ public static <K extends Writable, V extends Writable> TableSource<K, V> formattedFile(
+ String pathName, Class<? extends FileInputFormat<K, V>> formatClass,
+ Class<K> keyClass, Class<V> valueClass) {
+ return formattedFile(new Path(pathName), formatClass, keyClass, valueClass);
+ }
+
+ /**
+ * Creates a {@code TableSource<K, V>} for reading data from files that have custom
+ * {@code FileInputFormat<K, V>} implementations not covered by the provided {@code TableSource}
+ * and {@code Source} factory methods.
+ *
+ * @param The {@code Path} to the data
+ * @param formatClass The {@code FileInputFormat} implementation
+ * @param keyClass The {@code Writable} to use for the key
+ * @param valueClass The {@code Writable} to use for the value
+ * @return A new {@code TableSource<K, V>} instance
+ */
+ public static <K extends Writable, V extends Writable> TableSource<K, V> formattedFile(
+ Path path, Class<? extends FileInputFormat<K, V>> formatClass,
+ Class<K> keyClass, Class<V> valueClass) {
+ return formattedFile(path, formatClass, Writables.writables(keyClass),
+ Writables.writables(valueClass));
+ }
+
+ /**
+ * Creates a {@code TableSource<K, V>} for reading data from files that have custom
+ * {@code FileInputFormat} implementations not covered by the provided {@code TableSource}
+ * and {@code Source} factory methods.
+ *
+ * @param pathName The name of the path to the data on the filesystem
+ * @param formatClass The {@code FileInputFormat} implementation
+ * @param keyType The {@code PType} to use for the key
+ * @param valueType The {@code PType} to use for the value
+ * @return A new {@code TableSource<K, V>} instance
+ */
+ public static <K, V> TableSource<K, V> formattedFile(String pathName,
+ Class<? extends FileInputFormat<?, ?>> formatClass,
+ PType<K> keyType, PType<V> valueType) {
+ return formattedFile(new Path(pathName), formatClass, keyType, valueType);
+ }
+
+ /**
+ * Creates a {@code TableSource<K, V>} for reading data from files that have custom
+ * {@code FileInputFormat} implementations not covered by the provided {@code TableSource}
+ * and {@code Source} factory methods.
+ *
+ * @param The {@code Path} to the data
+ * @param formatClass The {@code FileInputFormat} implementation
+ * @param keyType The {@code PType} to use for the key
+ * @param valueType The {@code PType} to use for the value
+ * @return A new {@code TableSource<K, V>} instance
+ */
+ public static <K, V> TableSource<K, V> formattedFile(Path path,
+ Class<? extends FileInputFormat<?, ?>> formatClass,
+ PType<K> keyType, PType<V> valueType) {
+ PTableType<K, V> tableType = keyType.getFamily().tableOf(keyType, valueType);
+ return new FileTableSourceImpl<K, V>(path, tableType, formatClass);
+ }
+
+ /**
+ * Creates a {@code Source<T>} instance from the Avro file(s) at the given path name.
+ *
+ * @param pathName The name of the path to the data on the filesystem
+ * @param avroClass The subclass of {@code SpecificRecord} to use for the Avro file
+ * @return A new {@code Source<T>} instance
+ */
+ public static <T extends SpecificRecord> Source<T> avroFile(String pathName, Class<T> avroClass) {
+ return avroFile(new Path(pathName), avroClass);
+ }
+
+ /**
+ * Creates a {@code Source<T>} instance from the Avro file(s) at the given {@code Path}.
+ *
+ * @param path The {@code Path} to the data
+ * @param avroClass The subclass of {@code SpecificRecord} to use for the Avro file
+ * @return A new {@code Source<T>} instance
+ */
+ public static <T extends SpecificRecord> Source<T> avroFile(Path path, Class<T> avroClass) {
+ return avroFile(path, Avros.specifics(avroClass));
+ }
+
+ /**
+ * Creates a {@code Source<T>} instance from the Avro file(s) at the given path name.
+ *
+ * @param pathName The name of the path to the data on the filesystem
+ * @param avroType The {@code AvroType} for the Avro records
+ * @return A new {@code Source<T>} instance
+ */
+ public static <T> Source<T> avroFile(String pathName, AvroType<T> avroType) {
+ return avroFile(new Path(pathName), avroType);
+ }
+
+ /**
+ * Creates a {@code Source<T>} instance from the Avro file(s) at the given {@code Path}.
+ *
+ * @param path The {@code Path} to the data
+ * @param avroType The {@code AvroType} for the Avro records
+ * @return A new {@code Source<T>} instance
+ */
+ public static <T> Source<T> avroFile(Path path, AvroType<T> avroType) {
+ return new AvroFileSource<T>(path, avroType);
+ }
+
+ /**
+ * Creates a {@code Source<T>} instance from the SequenceFile(s) at the given path name
+ * from the value field of each key-value pair in the SequenceFile(s).
+ *
+ * @param pathName The name of the path to the data on the filesystem
+ * @param valueClass The {@code Writable} type for the value of the SequenceFile entry
+ * @return A new {@code Source<T>} instance
+ */
+ public static <T extends Writable> Source<T> sequenceFile(String pathName, Class<T> valueClass) {
+ return sequenceFile(new Path(pathName), valueClass);
+ }
+
+ /**
+ * Creates a {@code Source<T>} instance from the SequenceFile(s) at the given {@code Path}
+ * from the value field of each key-value pair in the SequenceFile(s).
+ *
+ * @param path The {@code Path} to the data
+ * @param valueClass The {@code Writable} type for the value of the SequenceFile entry
+ * @return A new {@code Source<T>} instance
+ */
+ public static <T extends Writable> Source<T> sequenceFile(Path path, Class<T> valueClass) {
+ return sequenceFile(path, Writables.writables(valueClass));
+ }
+
+ /**
+ * Creates a {@code Source<T>} instance from the SequenceFile(s) at the given path name
+ * from the value field of each key-value pair in the SequenceFile(s).
+ *
+ * @param pathName The name of the path to the data on the filesystem
+ * @param ptype The {@code PType} for the value of the SequenceFile entry
+ * @return A new {@code Source<T>} instance
+ */
+ public static <T> Source<T> sequenceFile(String pathName, PType<T> ptype) {
+ return sequenceFile(new Path(pathName), ptype);
+ }
+
+ /**
+ * Creates a {@code Source<T>} instance from the SequenceFile(s) at the given {@code Path}
+ * from the value field of each key-value pair in the SequenceFile(s).
+ *
+ * @param path The {@code Path} to the data
+ * @param ptype The {@code PType} for the value of the SequenceFile entry
+ * @return A new {@code Source<T>} instance
+ */
+ public static <T> Source<T> sequenceFile(Path path, PType<T> ptype) {
+ return new SeqFileSource<T>(path, ptype);
+ }
+
+ /**
+ * Creates a {@code TableSource<K, V>} instance for the SequenceFile(s) at the given path name.
+ *
+ * @param pathName The name of the path to the data on the filesystem
+ * @param keyClass The {@code Writable} subclass for the key of the SequenceFile entry
+ * @param valueClass The {@code Writable} subclass for the value of the SequenceFile entry
+ * @return A new {@code SourceTable<K, V>} instance
+ */
+ public static <K extends Writable, V extends Writable> TableSource<K, V> sequenceFile(
+ String pathName, Class<K> keyClass, Class<V> valueClass) {
+ return sequenceFile(new Path(pathName), keyClass, valueClass);
+ }
+
+ /**
+ * Creates a {@code TableSource<K, V>} instance for the SequenceFile(s) at the given {@code Path}.
+ *
+ * @param path The {@code Path} to the data
+ * @param keyClass The {@code Writable} subclass for the key of the SequenceFile entry
+ * @param valueClass The {@code Writable} subclass for the value of the SequenceFile entry
+ * @return A new {@code SourceTable<K, V>} instance
+ */
+ public static <K extends Writable, V extends Writable> TableSource<K, V> sequenceFile(
+ Path path, Class<K> keyClass, Class<V> valueClass) {
+ return sequenceFile(path, Writables.writables(keyClass), Writables.writables(valueClass));
+ }
+
+ /**
+ * Creates a {@code TableSource<K, V>} instance for the SequenceFile(s) at the given path name.
+ *
+ * @param pathName The name of the path to the data on the filesystem
+ * @param keyType The {@code PType} for the key of the SequenceFile entry
+ * @param valueType The {@code PType} for the value of the SequenceFile entry
+ * @return A new {@code SourceTable<K, V>} instance
+ */
+ public static <K, V> TableSource<K, V> sequenceFile(String pathName, PType<K> keyType, PType<V> valueType) {
+ return sequenceFile(new Path(pathName), keyType, valueType);
+ }
+
+ /**
+ * Creates a {@code TableSource<K, V>} instance for the SequenceFile(s) at the given {@code Path}.
+ *
+ * @param path The {@code Path} to the data
+ * @param keyType The {@code PType} for the key of the SequenceFile entry
+ * @param valueType The {@code PType} for the value of the SequenceFile entry
+ * @return A new {@code SourceTable<K, V>} instance
+ */
+ public static <K, V> TableSource<K, V> sequenceFile(Path path, PType<K> keyType, PType<V> valueType) {
+ PTypeFamily ptf = keyType.getFamily();
+ return new SeqFileTableSource<K, V>(path, ptf.tableOf(keyType, valueType));
+ }
+
+ /**
+ * Creates a {@code Source<String>} instance for the text file(s) at the given path name.
+ *
+ * @param pathName The name of the path to the data on the filesystem
+ * @return A new {@code Source<String>} instance
+ */
+ public static Source<String> textFile(String pathName) {
+ return textFile(new Path(pathName));
+ }
+
+ /**
+ * Creates a {@code Source<String>} instance for the text file(s) at the given {@code Path}.
+ *
+ * @param path The {@code Path} to the data
+ * @return A new {@code Source<String>} instance
+ */
+ public static Source<String> textFile(Path path) {
+ return textFile(path, Writables.strings());
+ }
+
+ /**
+ * Creates a {@code Source<T>} instance for the text file(s) at the given path name using
+ * the provided {@code PType<T>} to convert the input text.
+ *
+ * @param pathName The name of the path to the data on the filesystem
+ * @param ptype The {@code PType<T>} to use to process the input text
+ * @return A new {@code Source<T>} instance
+ */
+ public static <T> Source<T> textFile(String pathName, PType<T> ptype) {
+ return textFile(new Path(pathName), ptype);
+ }
+
+ /**
+ * Creates a {@code Source<T>} instance for the text file(s) at the given {@code Path} using
+ * the provided {@code PType<T>} to convert the input text.
+ *
+ * @param path The {@code Path} to the data
+ * @param ptype The {@code PType<T>} to use to process the input text
+ * @return A new {@code Source<T>} instance
+ */
+ public static <T> Source<T> textFile(Path path, PType<T> ptype) {
+ return new TextFileSource<T>(path, ptype);
+ }
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/io/MapReduceTarget.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/MapReduceTarget.java b/crunch-core/src/main/java/org/apache/crunch/io/MapReduceTarget.java
new file mode 100644
index 0000000..b484103
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/io/MapReduceTarget.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io;
+
+import org.apache.crunch.Target;
+import org.apache.crunch.types.PType;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+
+public interface MapReduceTarget extends Target {
+ void configureForMapReduce(Job job, PType<?> ptype, Path outputPath, String name);
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/io/OutputHandler.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/OutputHandler.java b/crunch-core/src/main/java/org/apache/crunch/io/OutputHandler.java
new file mode 100644
index 0000000..01d7f99
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/io/OutputHandler.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io;
+
+import org.apache.crunch.Target;
+import org.apache.crunch.types.PType;
+
+public interface OutputHandler {
+ boolean configure(Target target, PType<?> ptype);
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/io/PathTarget.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/PathTarget.java b/crunch-core/src/main/java/org/apache/crunch/io/PathTarget.java
new file mode 100644
index 0000000..7a35209
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/io/PathTarget.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io;
+
+import org.apache.hadoop.fs.Path;
+
+/**
+ * A target whose output goes to a given path on a file system.
+ */
+public interface PathTarget extends MapReduceTarget {
+
+ Path getPath();
+
+ /**
+ * Get the naming scheme to be used for outputs being written to an output
+ * path.
+ *
+ * @return the naming scheme to be used
+ */
+ FileNamingScheme getFileNamingScheme();
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/io/PathTargetImpl.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/PathTargetImpl.java b/crunch-core/src/main/java/org/apache/crunch/io/PathTargetImpl.java
new file mode 100644
index 0000000..0be3f9a
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/io/PathTargetImpl.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io;
+
+import org.apache.crunch.types.PType;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+
+public abstract class PathTargetImpl implements PathTarget {
+
+ private final Path path;
+ private final Class<OutputFormat> outputFormatClass;
+ private final Class keyClass;
+ private final Class valueClass;
+
+ public PathTargetImpl(String path, Class<OutputFormat> outputFormatClass, Class keyClass, Class valueClass) {
+ this(new Path(path), outputFormatClass, keyClass, valueClass);
+ }
+
+ public PathTargetImpl(Path path, Class<OutputFormat> outputFormatClass, Class keyClass, Class valueClass) {
+ this.path = path;
+ this.outputFormatClass = outputFormatClass;
+ this.keyClass = keyClass;
+ this.valueClass = valueClass;
+ }
+
+ @Override
+ public void configureForMapReduce(Job job, PType<?> ptype, Path outputPath, String name) {
+ try {
+ FileOutputFormat.setOutputPath(job, path);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ if (name == null) {
+ job.setOutputFormatClass(outputFormatClass);
+ job.setOutputKeyClass(keyClass);
+ job.setOutputValueClass(valueClass);
+ } else {
+ CrunchOutputs.addNamedOutput(job, name, outputFormatClass, keyClass, valueClass);
+ }
+ }
+
+ @Override
+ public Path getPath() {
+ return path;
+ }
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/io/ReadableSource.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/ReadableSource.java b/crunch-core/src/main/java/org/apache/crunch/io/ReadableSource.java
new file mode 100644
index 0000000..0407167
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/io/ReadableSource.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io;
+
+import java.io.IOException;
+
+import org.apache.crunch.Source;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * An extension of the {@code Source} interface that indicates that a
+ * {@code Source} instance may be read as a series of records by the client
+ * code. This is used to determine whether a {@code PCollection} instance can be
+ * materialized.
+ */
+public interface ReadableSource<T> extends Source<T> {
+
+ /**
+ * Returns an {@code Iterable} that contains the contents of this source.
+ *
+ * @param conf The current {@code Configuration} instance
+ * @return the contents of this {@code Source} as an {@code Iterable} instance
+ * @throws IOException
+ */
+ Iterable<T> read(Configuration conf) throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/io/ReadableSourceTarget.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/ReadableSourceTarget.java b/crunch-core/src/main/java/org/apache/crunch/io/ReadableSourceTarget.java
new file mode 100644
index 0000000..95c90aa
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/io/ReadableSourceTarget.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io;
+
+import org.apache.crunch.SourceTarget;
+
+/**
+ * An interface that indicates that a {@code SourceTarget} instance can be read
+ * into the local client.
+ *
+ * @param <T>
+ * The type of data read.
+ */
+public interface ReadableSourceTarget<T> extends ReadableSource<T>, SourceTarget<T> {
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/io/SequentialFileNamingScheme.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/SequentialFileNamingScheme.java b/crunch-core/src/main/java/org/apache/crunch/io/SequentialFileNamingScheme.java
new file mode 100644
index 0000000..bdda8e6
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/io/SequentialFileNamingScheme.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Default {@link FileNamingScheme} that uses an incrementing sequence number in
+ * order to generate unique file names.
+ */
+public class SequentialFileNamingScheme implements FileNamingScheme {
+
+ @Override
+ public String getMapOutputName(Configuration configuration, Path outputDirectory) throws IOException {
+ return getSequentialFileName(configuration, outputDirectory, "m");
+ }
+
+ @Override
+ public String getReduceOutputName(Configuration configuration, Path outputDirectory, int partitionId)
+ throws IOException {
+ return getSequentialFileName(configuration, outputDirectory, "r");
+ }
+
+ private String getSequentialFileName(Configuration configuration, Path outputDirectory, String jobTypeName)
+ throws IOException {
+ FileSystem fileSystem = outputDirectory.getFileSystem(configuration);
+ int fileSequenceNumber = fileSystem.listStatus(outputDirectory).length;
+
+ return String.format("part-%s-%05d", jobTypeName, fileSequenceNumber);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/io/SourceTargetHelper.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/SourceTargetHelper.java b/crunch-core/src/main/java/org/apache/crunch/io/SourceTargetHelper.java
new file mode 100644
index 0000000..f4400de
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/io/SourceTargetHelper.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Functions for configuring the inputs/outputs of MapReduce jobs.
+ *
+ */
+public class SourceTargetHelper {
+
+ public static long getPathSize(Configuration conf, Path path) throws IOException {
+ return getPathSize(path.getFileSystem(conf), path);
+ }
+
+ public static long getPathSize(FileSystem fs, Path path) throws IOException {
+ FileStatus[] stati = fs.globStatus(path);
+ if (stati == null || stati.length == 0) {
+ return -1L;
+ }
+ long size = 0;
+ for (FileStatus status : stati) {
+ size += fs.getContentSummary(status.getPath()).getLength();
+ }
+ return size;
+ }
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/io/To.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/To.java b/crunch-core/src/main/java/org/apache/crunch/io/To.java
new file mode 100644
index 0000000..d62d294
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/io/To.java
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io;
+
+import org.apache.crunch.Target;
+import org.apache.crunch.io.avro.AvroFileTarget;
+import org.apache.crunch.io.impl.FileTargetImpl;
+import org.apache.crunch.io.seq.SeqFileTarget;
+import org.apache.crunch.io.text.TextFileTarget;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+
+/**
+ * <p>Static factory methods for creating common {@link Target} types.</p>
+ *
+ * <p>The {@code To} class is intended to be used as part of a literate API
+ * for writing the output of Crunch pipelines to common file types. We can use
+ * the {@code Target} objects created by the factory methods in the {@code To}
+ * class with either the {@code write} method on the {@code Pipeline} class or
+ * the convenience {@code write} method on {@code PCollection} and {@code PTable}
+ * instances.
+ *
+ * <code>
+ * Pipeline pipeline = new MRPipeline(this.getClass());
+ * ...
+ * // Write a PCollection<String> to a text file:
+ * PCollection<String> words = ...;
+ * pipeline.write(words, To.textFile("/put/my/words/here"));
+ *
+ * // Write a PTable<Text, Text> to a sequence file:
+ * PTable<Text, Text> textToText = ...;
+ * textToText.write(To.sequenceFile("/words/to/words"));
+ *
+ * // Write a PCollection<MyAvroObject> to an Avro data file:
+ * PCollection<MyAvroObject> objects = ...;
+ * objects.write(To.avroFile("/my/avro/files"));
+ *
+ * // Write a PTable to a custom FileOutputFormat:
+ * PTable<KeyWritable, ValueWritable> custom = ...;
+ * pipeline.write(custom, To.formattedFile("/custom", MyFileFormat.class));
+ * </code>
+ * </p>
+ */
+public class To {
+
+ /**
+ * Creates a {@code Target} at the given path name that writes data to
+ * a custom {@code FileOutputFormat}.
+ *
+ * @param pathName The name of the path to write the data to on the filesystem
+ * @param formatClass The {@code FileOutputFormat<K, V>} to write the data to
+ * @return A new {@code Target} instance
+ */
+ public static <K extends Writable, V extends Writable> Target formattedFile(
+ String pathName, Class<? extends FileOutputFormat<K, V>> formatClass) {
+ return formattedFile(new Path(pathName), formatClass);
+ }
+
+ /**
+ * Creates a {@code Target} at the given {@code Path} that writes data to
+ * a custom {@code FileOutputFormat}.
+ *
+ * @param path The {@code Path} to write the data to
+ * @param formatClass The {@code FileOutputFormat} to write the data to
+ * @return A new {@code Target} instance
+ */
+ public static <K extends Writable, V extends Writable> Target formattedFile(
+ Path path, Class<? extends FileOutputFormat<K, V>> formatClass) {
+ return new FileTargetImpl(path, formatClass, new SequentialFileNamingScheme());
+ }
+
+ /**
+ * Creates a {@code Target} at the given path name that writes data to
+ * Avro files. The {@code PType} for the written data must be for Avro records.
+ *
+ * @param pathName The name of the path to write the data to on the filesystem
+ * @return A new {@code Target} instance
+ */
+ public static Target avroFile(String pathName) {
+ return avroFile(new Path(pathName));
+ }
+
+ /**
+ * Creates a {@code Target} at the given {@code Path} that writes data to
+ * Avro files. The {@code PType} for the written data must be for Avro records.
+ *
+ * @param path The {@code Path} to write the data to
+ * @return A new {@code Target} instance
+ */
+ public static Target avroFile(Path path) {
+ return new AvroFileTarget(path);
+ }
+
+ /**
+ * Creates a {@code Target} at the given path name that writes data to
+ * SequenceFiles.
+ *
+ * @param pathName The name of the path to write the data to on the filesystem
+ * @return A new {@code Target} instance
+ */
+ public static Target sequenceFile(String pathName) {
+ return sequenceFile(new Path(pathName));
+ }
+
+ /**
+ * Creates a {@code Target} at the given {@code Path} that writes data to
+ * SequenceFiles.
+ *
+ * @param path The {@code Path} to write the data to
+ * @return A new {@code Target} instance
+ */
+ public static Target sequenceFile(Path path) {
+ return new SeqFileTarget(path);
+ }
+
+ /**
+ * Creates a {@code Target} at the given path name that writes data to
+ * text files.
+ *
+ * @param pathName The name of the path to write the data to on the filesystem
+ * @return A new {@code Target} instance
+ */
+ public static Target textFile(String pathName) {
+ return textFile(new Path(pathName));
+ }
+
+ /**
+ * Creates a {@code Target} at the given {@code Path} that writes data to
+ * text files.
+ *
+ * @param path The {@code Path} to write the data to
+ * @return A new {@code Target} instance
+ */
+ public static Target textFile(Path path) {
+ return new TextFileTarget(path);
+ }
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileReaderFactory.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileReaderFactory.java b/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileReaderFactory.java
new file mode 100644
index 0000000..c8fe23a
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileReaderFactory.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.avro;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.mapred.FsInput;
+import org.apache.avro.reflect.ReflectDatumReader;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.crunch.MapFn;
+import org.apache.crunch.fn.IdentityFn;
+import org.apache.crunch.io.FileReaderFactory;
+import org.apache.crunch.io.impl.AutoClosingIterator;
+import org.apache.crunch.types.avro.AvroType;
+import org.apache.crunch.types.avro.Avros;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import com.google.common.collect.Iterators;
+import com.google.common.collect.UnmodifiableIterator;
+
+public class AvroFileReaderFactory<T> implements FileReaderFactory<T> {
+
+ private static final Log LOG = LogFactory.getLog(AvroFileReaderFactory.class);
+
+ private final DatumReader<T> recordReader;
+ private final MapFn<T, T> mapFn;
+
+ public AvroFileReaderFactory(AvroType<T> atype) {
+ this.recordReader = createDatumReader(atype);
+ this.mapFn = (MapFn<T, T>) atype.getInputMapFn();
+ }
+
+ public AvroFileReaderFactory(Schema schema) {
+ this.recordReader = new GenericDatumReader<T>(schema);
+ this.mapFn = IdentityFn.<T>getInstance();
+ }
+
+ static <T> DatumReader<T> createDatumReader(AvroType<T> avroType) {
+ if (avroType.hasReflect()) {
+ if (avroType.hasSpecific()) {
+ Avros.checkCombiningSpecificAndReflectionSchemas();
+ }
+ return new ReflectDatumReader<T>(avroType.getSchema());
+ } else if (avroType.hasSpecific()) {
+ return new SpecificDatumReader<T>(avroType.getSchema());
+ } else {
+ return new GenericDatumReader<T>(avroType.getSchema());
+ }
+ }
+
+ @Override
+ public Iterator<T> read(FileSystem fs, final Path path) {
+ this.mapFn.initialize();
+ try {
+ FsInput fsi = new FsInput(path, fs.getConf());
+ final DataFileReader<T> reader = new DataFileReader<T>(fsi, recordReader);
+ return new AutoClosingIterator<T>(reader, new UnmodifiableIterator<T>() {
+ @Override
+ public boolean hasNext() {
+ return reader.hasNext();
+ }
+
+ @Override
+ public T next() {
+ return mapFn.map(reader.next());
+ }
+ });
+ } catch (IOException e) {
+ LOG.info("Could not read avro file at path: " + path, e);
+ return Iterators.emptyIterator();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileSource.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileSource.java b/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileSource.java
new file mode 100644
index 0000000..15792bf
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileSource.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.avro;
+
+import java.io.IOException;
+
+import org.apache.avro.mapred.AvroJob;
+import org.apache.crunch.io.CompositePathIterable;
+import org.apache.crunch.io.FormatBundle;
+import org.apache.crunch.io.ReadableSource;
+import org.apache.crunch.io.impl.FileSourceImpl;
+import org.apache.crunch.types.avro.AvroInputFormat;
+import org.apache.crunch.types.avro.AvroType;
+import org.apache.crunch.types.avro.Avros;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+public class AvroFileSource<T> extends FileSourceImpl<T> implements ReadableSource<T> {
+
+ private static <S> FormatBundle getBundle(AvroType<S> ptype) {
+ FormatBundle bundle = FormatBundle.forInput(AvroInputFormat.class)
+ .set(AvroJob.INPUT_IS_REFLECT, String.valueOf(ptype.hasReflect()))
+ .set(AvroJob.INPUT_SCHEMA, ptype.getSchema().toString())
+ .set(Avros.REFLECT_DATA_FACTORY_CLASS, Avros.REFLECT_DATA_FACTORY.getClass().getName());
+ return bundle;
+ }
+
+ public AvroFileSource(Path path, AvroType<T> ptype) {
+ super(path, ptype, getBundle(ptype));
+ }
+
+ @Override
+ public String toString() {
+ return "Avro(" + path.toString() + ")";
+ }
+
+ @Override
+ public Iterable<T> read(Configuration conf) throws IOException {
+ FileSystem fs = path.getFileSystem(conf);
+ return CompositePathIterable.create(fs, path, new AvroFileReaderFactory<T>((AvroType<T>) ptype));
+ }
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileSourceTarget.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileSourceTarget.java b/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileSourceTarget.java
new file mode 100644
index 0000000..76103e5
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileSourceTarget.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.avro;
+
+import org.apache.crunch.io.FileNamingScheme;
+import org.apache.crunch.io.SequentialFileNamingScheme;
+import org.apache.crunch.io.impl.ReadableSourcePathTargetImpl;
+import org.apache.crunch.types.avro.AvroType;
+import org.apache.hadoop.fs.Path;
+
+public class AvroFileSourceTarget<T> extends ReadableSourcePathTargetImpl<T> {
+ public AvroFileSourceTarget(Path path, AvroType<T> atype) {
+ this(path, atype, new SequentialFileNamingScheme());
+ }
+
+ public AvroFileSourceTarget(Path path, AvroType<T> atype, FileNamingScheme fileNamingScheme) {
+ super(new AvroFileSource<T>(path, atype), new AvroFileTarget(path), fileNamingScheme);
+ }
+
+ @Override
+ public String toString() {
+ return target.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileTarget.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileTarget.java b/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileTarget.java
new file mode 100644
index 0000000..3a9e42c
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileTarget.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.avro;
+
+import org.apache.avro.mapred.AvroWrapper;
+import org.apache.crunch.SourceTarget;
+import org.apache.crunch.io.FileNamingScheme;
+import org.apache.crunch.io.OutputHandler;
+import org.apache.crunch.io.SequentialFileNamingScheme;
+import org.apache.crunch.io.impl.FileTargetImpl;
+import org.apache.crunch.types.PType;
+import org.apache.crunch.types.avro.AvroOutputFormat;
+import org.apache.crunch.types.avro.AvroType;
+import org.apache.crunch.types.avro.Avros;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Job;
+
+public class AvroFileTarget extends FileTargetImpl {
+
+ public AvroFileTarget(String path) {
+ this(new Path(path));
+ }
+
+ public AvroFileTarget(Path path) {
+ this(path, new SequentialFileNamingScheme());
+ }
+
+ public AvroFileTarget(Path path, FileNamingScheme fileNamingScheme) {
+ super(path, AvroOutputFormat.class, fileNamingScheme);
+ }
+
+ @Override
+ public String toString() {
+ return "Avro(" + path.toString() + ")";
+ }
+
+ @Override
+ public boolean accept(OutputHandler handler, PType<?> ptype) {
+ if (!(ptype instanceof AvroType)) {
+ return false;
+ }
+ handler.configure(this, ptype);
+ return true;
+ }
+
+ @Override
+ public void configureForMapReduce(Job job, PType<?> ptype, Path outputPath, String name) {
+ AvroType<?> atype = (AvroType<?>) ptype;
+ Configuration conf = job.getConfiguration();
+ String schemaParam = null;
+ if (name == null) {
+ schemaParam = "avro.output.schema";
+ } else {
+ schemaParam = "avro.output.schema." + name;
+ }
+ String outputSchema = conf.get(schemaParam);
+ if (outputSchema == null) {
+ conf.set(schemaParam, atype.getSchema().toString());
+ } else if (!outputSchema.equals(atype.getSchema().toString())) {
+ throw new IllegalStateException("Avro targets must use the same output schema");
+ }
+ Avros.configureReflectDataFactory(conf);
+ configureForMapReduce(job, AvroWrapper.class, NullWritable.class, AvroOutputFormat.class,
+ outputPath, name);
+ }
+
+ @Override
+ public <T> SourceTarget<T> asSourceTarget(PType<T> ptype) {
+ if (ptype instanceof AvroType) {
+ return new AvroFileSourceTarget<T>(path, (AvroType<T>) ptype);
+ }
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/io/impl/AutoClosingIterator.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/impl/AutoClosingIterator.java b/crunch-core/src/main/java/org/apache/crunch/io/impl/AutoClosingIterator.java
new file mode 100644
index 0000000..3bd802e
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/io/impl/AutoClosingIterator.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.impl;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Iterator;
+
+import com.google.common.collect.UnmodifiableIterator;
+import com.google.common.io.Closeables;
+
+/**
+ * Closes the wrapped {@code Closeable} when {@link #hasNext()} returns false. As long a client loops through to
+ * completion (doesn't abort early due to an exception, short circuit, etc.) resources will be closed automatically.
+ */
+public class AutoClosingIterator<T> extends UnmodifiableIterator<T> implements Closeable {
+ private final Iterator<T> iter;
+ private Closeable closeable;
+
+ public AutoClosingIterator(Closeable closeable, Iterator<T> iter) {
+ this.closeable = closeable;
+ this.iter = iter;
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (!iter.hasNext()) {
+ Closeables.closeQuietly(this);
+ return false;
+ } else {
+ return true;
+ }
+ }
+
+ @Override
+ public T next() {
+ return iter.next();
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (closeable != null) {
+ closeable.close();
+ closeable = null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java b/crunch-core/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java
new file mode 100644
index 0000000..688c801
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.impl;
+
+import java.io.IOException;
+
+import org.apache.commons.lang.builder.HashCodeBuilder;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.crunch.Source;
+import org.apache.crunch.io.CrunchInputs;
+import org.apache.crunch.io.FormatBundle;
+import org.apache.crunch.io.SourceTargetHelper;
+import org.apache.crunch.types.PType;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+
+public class FileSourceImpl<T> implements Source<T> {
+
+ private static final Log LOG = LogFactory.getLog(FileSourceImpl.class);
+
+ protected final Path path;
+ protected final PType<T> ptype;
+ protected final FormatBundle<? extends InputFormat> inputBundle;
+
+ public FileSourceImpl(Path path, PType<T> ptype, Class<? extends InputFormat> inputFormatClass) {
+ this.path = path;
+ this.ptype = ptype;
+ this.inputBundle = FormatBundle.forInput(inputFormatClass);
+ }
+
+ public FileSourceImpl(Path path, PType<T> ptype, FormatBundle<? extends InputFormat> inputBundle) {
+ this.path = path;
+ this.ptype = ptype;
+ this.inputBundle = inputBundle;
+ }
+
+ public Path getPath() {
+ return path;
+ }
+
+ @Override
+ public void configureSource(Job job, int inputId) throws IOException {
+ if (inputId == -1) {
+ FileInputFormat.addInputPath(job, path);
+ job.setInputFormatClass(inputBundle.getFormatClass());
+ inputBundle.configure(job.getConfiguration());
+ } else {
+ CrunchInputs.addInputPath(job, path, inputBundle, inputId);
+ }
+ }
+
+ @Override
+ public PType<T> getType() {
+ return ptype;
+ }
+
+ @Override
+ public long getSize(Configuration configuration) {
+ try {
+ return SourceTargetHelper.getPathSize(configuration, path);
+ } catch (IOException e) {
+ LOG.warn(String.format("Exception thrown looking up size of: %s", path), e);
+ throw new IllegalStateException("Failed to get the file size of:" + path, e);
+ }
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (other == null || !getClass().equals(other.getClass())) {
+ return false;
+ }
+ FileSourceImpl o = (FileSourceImpl) other;
+ return ptype.equals(o.ptype) && path.equals(o.path) && inputBundle.equals(o.inputBundle);
+ }
+
+ @Override
+ public int hashCode() {
+ return new HashCodeBuilder().append(ptype).append(path).append(inputBundle).toHashCode();
+ }
+
+ @Override
+ public String toString() {
+ return new StringBuilder().append(inputBundle.getName()).append("(").append(path).append(")").toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTableSourceImpl.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTableSourceImpl.java b/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTableSourceImpl.java
new file mode 100644
index 0000000..295edb5
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTableSourceImpl.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.impl;
+
+import org.apache.crunch.Pair;
+import org.apache.crunch.TableSource;
+import org.apache.crunch.io.FormatBundle;
+import org.apache.crunch.types.PTableType;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+
+public class FileTableSourceImpl<K, V> extends FileSourceImpl<Pair<K, V>> implements TableSource<K, V> {
+
+ public FileTableSourceImpl(Path path, PTableType<K, V> tableType, Class<? extends FileInputFormat> formatClass) {
+ super(path, tableType, formatClass);
+ }
+
+ public FileTableSourceImpl(Path path, PTableType<K, V> tableType, FormatBundle bundle) {
+ super(path, tableType, bundle);
+ }
+
+ @Override
+ public PTableType<K, V> getTableType() {
+ return (PTableType<K, V>) getType();
+ }
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java b/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java
new file mode 100644
index 0000000..c1c29e4
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.impl;
+
+import java.io.IOException;
+
+import org.apache.commons.lang.builder.HashCodeBuilder;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.crunch.CrunchRuntimeException;
+import org.apache.crunch.SourceTarget;
+import org.apache.crunch.io.CrunchOutputs;
+import org.apache.crunch.io.FileNamingScheme;
+import org.apache.crunch.io.OutputHandler;
+import org.apache.crunch.io.PathTarget;
+import org.apache.crunch.types.Converter;
+import org.apache.crunch.types.PType;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+
+public class FileTargetImpl implements PathTarget {
+
+ private static final Log LOG = LogFactory.getLog(FileTargetImpl.class);
+
+ protected final Path path;
+ private final Class<? extends FileOutputFormat> outputFormatClass;
+ private final FileNamingScheme fileNamingScheme;
+
+ public FileTargetImpl(Path path, Class<? extends FileOutputFormat> outputFormatClass,
+ FileNamingScheme fileNamingScheme) {
+ this.path = path;
+ this.outputFormatClass = outputFormatClass;
+ this.fileNamingScheme = fileNamingScheme;
+ }
+
+ @Override
+ public void configureForMapReduce(Job job, PType<?> ptype, Path outputPath, String name) {
+ Converter converter = ptype.getConverter();
+ Class keyClass = converter.getKeyClass();
+ Class valueClass = converter.getValueClass();
+ configureForMapReduce(job, keyClass, valueClass, outputFormatClass, outputPath, name);
+ }
+
+ protected void configureForMapReduce(Job job, Class keyClass, Class valueClass,
+ Class outputFormatClass, Path outputPath, String name) {
+ try {
+ FileOutputFormat.setOutputPath(job, outputPath);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ if (name == null) {
+ job.setOutputFormatClass(outputFormatClass);
+ job.setOutputKeyClass(keyClass);
+ job.setOutputValueClass(valueClass);
+ } else {
+ CrunchOutputs.addNamedOutput(job, name, outputFormatClass, keyClass, valueClass);
+ }
+ }
+
+ @Override
+ public boolean accept(OutputHandler handler, PType<?> ptype) {
+ handler.configure(this, ptype);
+ return true;
+ }
+
+ @Override
+ public Path getPath() {
+ return path;
+ }
+
+ @Override
+ public FileNamingScheme getFileNamingScheme() {
+ return fileNamingScheme;
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (other == null || !getClass().equals(other.getClass())) {
+ return false;
+ }
+ FileTargetImpl o = (FileTargetImpl) other;
+ return path.equals(o.path);
+ }
+
+ @Override
+ public int hashCode() {
+ return new HashCodeBuilder().append(path).toHashCode();
+ }
+
+ @Override
+ public String toString() {
+ return new StringBuilder().append(outputFormatClass.getSimpleName()).append("(").append(path).append(")")
+ .toString();
+ }
+
+ @Override
+ public <T> SourceTarget<T> asSourceTarget(PType<T> ptype) {
+ // By default, assume that we cannot do this.
+ return null;
+ }
+
+ @Override
+ public void handleExisting(WriteMode strategy, Configuration conf) {
+ FileSystem fs = null;
+ try {
+ fs = FileSystem.get(conf);
+ } catch (IOException e) {
+ LOG.error("Could not retrieve FileSystem object to check for existing path", e);
+ throw new CrunchRuntimeException(e);
+ }
+
+ boolean exists = false;
+ try {
+ exists = fs.exists(path);
+ } catch (IOException e) {
+ LOG.error("Exception checking existence of path: " + path, e);
+ throw new CrunchRuntimeException(e);
+ }
+
+ if (exists) {
+ switch (strategy) {
+ case DEFAULT:
+ LOG.error("Path " + path + " already exists!");
+ throw new CrunchRuntimeException("Path already exists: " + path);
+ case OVERWRITE:
+ LOG.info("Removing data at existing path: " + path);
+ try {
+ fs.delete(path, true);
+ } catch (IOException e) {
+ LOG.error("Exception thrown removing data at path: " + path, e);
+ }
+ break;
+ case APPEND:
+ LOG.info("Adding output files to existing path: " + path);
+ break;
+ default:
+ throw new CrunchRuntimeException("Unknown WriteMode: " + strategy);
+ }
+ } else {
+ LOG.info("Will write output files to new path: " + path);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/io/impl/ReadableSourcePathTargetImpl.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/impl/ReadableSourcePathTargetImpl.java b/crunch-core/src/main/java/org/apache/crunch/io/impl/ReadableSourcePathTargetImpl.java
new file mode 100644
index 0000000..6506816
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/io/impl/ReadableSourcePathTargetImpl.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.impl;
+
+import java.io.IOException;
+
+import org.apache.crunch.io.FileNamingScheme;
+import org.apache.crunch.io.PathTarget;
+import org.apache.crunch.io.ReadableSource;
+import org.apache.crunch.io.ReadableSourceTarget;
+import org.apache.hadoop.conf.Configuration;
+
+public class ReadableSourcePathTargetImpl<T> extends SourcePathTargetImpl<T> implements ReadableSourceTarget<T> {
+
+ public ReadableSourcePathTargetImpl(ReadableSource<T> source, PathTarget target, FileNamingScheme fileNamingScheme) {
+ super(source, target, fileNamingScheme);
+ }
+
+ @Override
+ public Iterable<T> read(Configuration conf) throws IOException {
+ return ((ReadableSource<T>) source).read(conf);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/io/impl/ReadableSourceTargetImpl.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/impl/ReadableSourceTargetImpl.java b/crunch-core/src/main/java/org/apache/crunch/io/impl/ReadableSourceTargetImpl.java
new file mode 100644
index 0000000..f435b3b
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/io/impl/ReadableSourceTargetImpl.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.impl;
+
+import java.io.IOException;
+
+import org.apache.crunch.Target;
+import org.apache.crunch.io.ReadableSource;
+import org.apache.crunch.io.ReadableSourceTarget;
+import org.apache.hadoop.conf.Configuration;
+
+public class ReadableSourceTargetImpl<T> extends SourceTargetImpl<T> implements ReadableSourceTarget<T> {
+
+ public ReadableSourceTargetImpl(ReadableSource<T> source, Target target) {
+ super(source, target);
+ }
+
+ @Override
+ public Iterable<T> read(Configuration conf) throws IOException {
+ return ((ReadableSource<T>) source).read(conf);
+ }
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/io/impl/SourcePathTargetImpl.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/impl/SourcePathTargetImpl.java b/crunch-core/src/main/java/org/apache/crunch/io/impl/SourcePathTargetImpl.java
new file mode 100644
index 0000000..c0d7ce0
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/io/impl/SourcePathTargetImpl.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.impl;
+
+import org.apache.crunch.Source;
+import org.apache.crunch.io.FileNamingScheme;
+import org.apache.crunch.io.PathTarget;
+import org.apache.crunch.types.PType;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+
+public class SourcePathTargetImpl<T> extends SourceTargetImpl<T> implements PathTarget {
+
+ private final FileNamingScheme fileNamingScheme;
+
+ public SourcePathTargetImpl(Source<T> source, PathTarget target, FileNamingScheme fileNamingScheme) {
+ super(source, target);
+ this.fileNamingScheme = fileNamingScheme;
+ }
+
+ @Override
+ public void configureForMapReduce(Job job, PType<?> ptype, Path outputPath, String name) {
+ ((PathTarget) target).configureForMapReduce(job, ptype, outputPath, name);
+ }
+
+ @Override
+ public Path getPath() {
+ return ((PathTarget) target).getPath();
+ }
+
+ @Override
+ public FileNamingScheme getFileNamingScheme() {
+ return fileNamingScheme;
+ }
+}