You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2013/04/23 22:41:12 UTC
[10/43] CRUNCH-196: crunch -> crunch-core rename to fix build issues
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/io/CrunchOutputs.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/CrunchOutputs.java b/crunch/src/main/java/org/apache/crunch/io/CrunchOutputs.java
deleted file mode 100644
index ccf4fb5..0000000
--- a/crunch/src/main/java/org/apache/crunch/io/CrunchOutputs.java
+++ /dev/null
@@ -1,184 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.io;
-
-import org.apache.crunch.CrunchRuntimeException;
-import org.apache.crunch.hadoop.mapreduce.TaskAttemptContextFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskInputOutputContext;
-import org.apache.hadoop.util.ReflectionUtils;
-
-import com.google.common.base.Joiner;
-import com.google.common.base.Splitter;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-
-/**
- * An analogue of {@link CrunchInputs} for handling multiple {@code OutputFormat} instances
- * writing to multiple files within a single MapReduce job.
- */
-public class CrunchOutputs<K, V> {
- public static final String CRUNCH_OUTPUTS = "crunch.outputs.dir";
-
- private static final char RECORD_SEP = ',';
- private static final char FIELD_SEP = ';';
- private static final Joiner JOINER = Joiner.on(FIELD_SEP);
- private static final Splitter SPLITTER = Splitter.on(FIELD_SEP);
-
- public static void addNamedOutput(Job job, String name,
- Class<? extends OutputFormat> outputFormatClass,
- Class keyClass, Class valueClass) {
- addNamedOutput(job, name, FormatBundle.forOutput(outputFormatClass), keyClass, valueClass);
- }
-
- public static void addNamedOutput(Job job, String name,
- FormatBundle<? extends OutputFormat> outputBundle,
- Class keyClass, Class valueClass) {
- Configuration conf = job.getConfiguration();
- String inputs = JOINER.join(name, outputBundle.serialize(), keyClass.getName(), valueClass.getName());
- String existing = conf.get(CRUNCH_OUTPUTS);
- conf.set(CRUNCH_OUTPUTS, existing == null ? inputs : existing + RECORD_SEP + inputs);
- }
-
- private static class OutputConfig<K, V> {
- public FormatBundle<OutputFormat<K, V>> bundle;
- public Class<K> keyClass;
- public Class<V> valueClass;
-
- public OutputConfig(FormatBundle<OutputFormat<K, V>> bundle,
- Class<K> keyClass, Class<V> valueClass) {
- this.bundle = bundle;
- this.keyClass = keyClass;
- this.valueClass = valueClass;
- }
- }
-
- private static Map<String, OutputConfig> getNamedOutputs(
- TaskInputOutputContext<?, ?, ?, ?> context) {
- Map<String, OutputConfig> out = Maps.newHashMap();
- Configuration conf = context.getConfiguration();
- for (String input : Splitter.on(RECORD_SEP).split(conf.get(CRUNCH_OUTPUTS))) {
- List<String> fields = Lists.newArrayList(SPLITTER.split(input));
- String name = fields.get(0);
- FormatBundle<OutputFormat> bundle = FormatBundle.fromSerialized(fields.get(1),
- OutputFormat.class);
- try {
- Class<?> keyClass = Class.forName(fields.get(2));
- Class<?> valueClass = Class.forName(fields.get(3));
- out.put(name, new OutputConfig(bundle, keyClass, valueClass));
- } catch (ClassNotFoundException e) {
- throw new CrunchRuntimeException(e);
- }
- }
- return out;
- }
-
- private static final String BASE_OUTPUT_NAME = "mapreduce.output.basename";
- private static final String COUNTERS_GROUP = CrunchOutputs.class.getName();
-
- private TaskInputOutputContext<?, ?, K, V> baseContext;
- private Map<String, OutputConfig> namedOutputs;
- private Map<String, RecordWriter<K, V>> recordWriters;
- private Map<String, TaskAttemptContext> taskContextCache;
-
- /**
- * Creates and initializes multiple outputs support,
- * it should be instantiated in the Mapper/Reducer setup method.
- *
- * @param context the TaskInputOutputContext object
- */
- public CrunchOutputs(TaskInputOutputContext<?, ?, K, V> context) {
- this.baseContext = context;
- namedOutputs = getNamedOutputs(context);
- recordWriters = Maps.newHashMap();
- taskContextCache = Maps.newHashMap();
- }
-
- @SuppressWarnings("unchecked")
- public void write(String namedOutput, K key, V value)
- throws IOException, InterruptedException {
- if (!namedOutputs.containsKey(namedOutput)) {
- throw new IllegalArgumentException("Undefined named output '" +
- namedOutput + "'");
- }
- TaskAttemptContext taskContext = getContext(namedOutput);
- baseContext.getCounter(COUNTERS_GROUP, namedOutput).increment(1);
- getRecordWriter(taskContext, namedOutput).write(key, value);
- }
-
- public void close() throws IOException, InterruptedException {
- for (RecordWriter<?, ?> writer : recordWriters.values()) {
- writer.close(baseContext);
- }
- }
-
- private TaskAttemptContext getContext(String nameOutput) throws IOException {
- TaskAttemptContext taskContext = taskContextCache.get(nameOutput);
- if (taskContext != null) {
- return taskContext;
- }
-
- // The following trick leverages the instantiation of a record writer via
- // the job thus supporting arbitrary output formats.
- OutputConfig outConfig = namedOutputs.get(nameOutput);
- Configuration conf = new Configuration(baseContext.getConfiguration());
- Job job = new Job(conf);
- job.getConfiguration().set("crunch.namedoutput", nameOutput);
- job.setOutputFormatClass(outConfig.bundle.getFormatClass());
- job.setOutputKeyClass(outConfig.keyClass);
- job.setOutputValueClass(outConfig.valueClass);
- outConfig.bundle.configure(job.getConfiguration());
- taskContext = TaskAttemptContextFactory.create(
- job.getConfiguration(), baseContext.getTaskAttemptID());
-
- taskContextCache.put(nameOutput, taskContext);
- return taskContext;
- }
-
- private synchronized RecordWriter<K, V> getRecordWriter(
- TaskAttemptContext taskContext, String namedOutput)
- throws IOException, InterruptedException {
- // look for record-writer in the cache
- RecordWriter<K, V> writer = recordWriters.get(namedOutput);
-
- // If not in cache, create a new one
- if (writer == null) {
- // get the record writer from context output format
- taskContext.getConfiguration().set(BASE_OUTPUT_NAME, namedOutput);
- try {
- OutputFormat format = ReflectionUtils.newInstance(
- taskContext.getOutputFormatClass(),
- taskContext.getConfiguration());
- writer = format.getRecordWriter(taskContext);
- } catch (ClassNotFoundException e) {
- throw new IOException(e);
- }
- recordWriters.put(namedOutput, writer);
- }
-
- return writer;
- }
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/io/FileNamingScheme.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/FileNamingScheme.java b/crunch/src/main/java/org/apache/crunch/io/FileNamingScheme.java
deleted file mode 100644
index cf93651..0000000
--- a/crunch/src/main/java/org/apache/crunch/io/FileNamingScheme.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.io;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-
-/**
- * Encapsulates rules for naming output files. It is the responsibility of
- * implementors to avoid file name collisions.
- */
-public interface FileNamingScheme {
-
- /**
- * Get the output file name for a map task. Note that the implementation is
- * responsible for avoiding naming collisions.
- *
- * @param configuration The configuration of the job for which the map output
- * is being written
- * @param outputDirectory The directory where the output will be written
- * @return The filename for the output of the map task
- * @throws IOException if an exception occurs while accessing the output file
- * system
- */
- String getMapOutputName(Configuration configuration, Path outputDirectory) throws IOException;
-
- /**
- * Get the output file name for a reduce task. Note that the implementation is
- * responsible for avoiding naming collisions.
- *
- * @param configuration The configuration of the job for which output is being
- * written
- * @param outputDirectory The directory where the file will be written
- * @param partitionId The partition of the reduce task being output
- * @return The filename for the output of the reduce task
- * @throws IOException if an exception occurs while accessing output file
- * system
- */
- String getReduceOutputName(Configuration configuration, Path outputDirectory, int partitionId) throws IOException;
-
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/io/FileReaderFactory.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/FileReaderFactory.java b/crunch/src/main/java/org/apache/crunch/io/FileReaderFactory.java
deleted file mode 100644
index 5cccb7b..0000000
--- a/crunch/src/main/java/org/apache/crunch/io/FileReaderFactory.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.io;
-
-import java.util.Iterator;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-public interface FileReaderFactory<T> {
- Iterator<T> read(FileSystem fs, Path path);
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/io/FormatBundle.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/FormatBundle.java b/crunch/src/main/java/org/apache/crunch/io/FormatBundle.java
deleted file mode 100644
index d969009..0000000
--- a/crunch/src/main/java/org/apache/crunch/io/FormatBundle.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.io;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.io.Serializable;
-import java.util.Map;
-
-import org.apache.commons.codec.binary.Base64;
-import org.apache.commons.lang.builder.HashCodeBuilder;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.OutputFormat;
-
-import com.google.common.collect.Maps;
-
-/**
- * A combination of an {@link InputFormat} or {@link OutputFormat} and any extra
- * configuration information that format class needs to run.
- *
- * <p>The {@code FormatBundle} allow us to let different formats act as
- * if they are the only format that exists in a particular MapReduce job, even
- * when we have multiple types of inputs and outputs within a single job.
- */
-public class FormatBundle<K> implements Serializable {
-
- private Class<K> formatClass;
- private Map<String, String> extraConf;
-
- public static <T> FormatBundle<T> fromSerialized(String serialized, Class<T> clazz) {
- ByteArrayInputStream bais = new ByteArrayInputStream(Base64.decodeBase64(serialized));
- try {
- ObjectInputStream ois = new ObjectInputStream(bais);
- FormatBundle<T> bundle = (FormatBundle<T>) ois.readObject();
- ois.close();
- return bundle;
- } catch (IOException e) {
- throw new RuntimeException(e);
- } catch (ClassNotFoundException e) {
- throw new RuntimeException(e);
- }
- }
-
- public static <T extends InputFormat<?, ?>> FormatBundle<T> forInput(Class<T> inputFormatClass) {
- return new FormatBundle<T>(inputFormatClass);
- }
-
- public static <T extends OutputFormat<?, ?>> FormatBundle<T> forOutput(Class<T> inputFormatClass) {
- return new FormatBundle<T>(inputFormatClass);
- }
-
- private FormatBundle(Class<K> formatClass) {
- this.formatClass = formatClass;
- this.extraConf = Maps.newHashMap();
- }
-
- public FormatBundle<K> set(String key, String value) {
- this.extraConf.put(key, value);
- return this;
- }
-
- public Class<K> getFormatClass() {
- return formatClass;
- }
-
- public Configuration configure(Configuration conf) {
- for (Map.Entry<String, String> e : extraConf.entrySet()) {
- conf.set(e.getKey(), e.getValue());
- }
- return conf;
- }
-
- public String serialize() {
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- try {
- ObjectOutputStream oos = new ObjectOutputStream(baos);
- oos.writeObject(this);
- oos.close();
- return Base64.encodeBase64String(baos.toByteArray());
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- public String getName() {
- return formatClass.getSimpleName();
- }
-
- @Override
- public int hashCode() {
- return new HashCodeBuilder().append(formatClass).append(extraConf).toHashCode();
- }
-
- @Override
- public boolean equals(Object other) {
- if (other == null || !(other instanceof FormatBundle)) {
- return false;
- }
- FormatBundle<K> oib = (FormatBundle<K>) other;
- return formatClass.equals(oib.formatClass) && extraConf.equals(oib.extraConf);
- }
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/io/From.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/From.java b/crunch/src/main/java/org/apache/crunch/io/From.java
deleted file mode 100644
index e4cfb6a..0000000
--- a/crunch/src/main/java/org/apache/crunch/io/From.java
+++ /dev/null
@@ -1,324 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.io;
-
-import org.apache.avro.specific.SpecificRecord;
-import org.apache.crunch.Source;
-import org.apache.crunch.TableSource;
-import org.apache.crunch.io.avro.AvroFileSource;
-import org.apache.crunch.io.impl.FileTableSourceImpl;
-import org.apache.crunch.io.seq.SeqFileSource;
-import org.apache.crunch.io.seq.SeqFileTableSource;
-import org.apache.crunch.io.text.TextFileSource;
-import org.apache.crunch.types.PTableType;
-import org.apache.crunch.types.PType;
-import org.apache.crunch.types.PTypeFamily;
-import org.apache.crunch.types.avro.AvroType;
-import org.apache.crunch.types.avro.Avros;
-import org.apache.crunch.types.writable.Writables;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-
-/**
- * <p>Static factory methods for creating common {@link Source} types.</p>
- *
- * <p>The {@code From} class is intended to provide a literate API for creating
- * Crunch pipelines from common input file types.
- *
- * <code>
- * Pipeline pipeline = new MRPipeline(this.getClass());
- *
- * // Reference the lines of a text file by wrapping the TextInputFormat class.
- * PCollection<String> lines = pipeline.read(From.textFile("/path/to/myfiles"));
- *
- * // Reference entries from a sequence file where the key is a LongWritable and the
- * // value is a custom Writable class.
- * PTable<LongWritable, MyWritable> table = pipeline.read(From.sequenceFile(
- * "/path/to/seqfiles", LongWritable.class, MyWritable.class));
- *
- * // Reference the records from an Avro file, where MyAvroObject implements Avro's
- * // SpecificRecord interface.
- * PCollection<MyAvroObject> myObjects = pipeline.read(From.avroFile("/path/to/avrofiles",
- * MyAvroObject.class));
- *
- * // References the key-value pairs from a custom extension of FileInputFormat:
- * PTable<KeyWritable, ValueWritable> custom = pipeline.read(From.formattedFile(
- * "/custom", MyFileInputFormat.class, KeyWritable.class, ValueWritable.class));
- * </code>
- * </p>
- */
-public class From {
-
- /**
- * Creates a {@code TableSource<K, V>} for reading data from files that have custom
- * {@code FileInputFormat<K, V>} implementations not covered by the provided {@code TableSource}
- * and {@code Source} factory methods.
- *
- * @param pathName The name of the path to the data on the filesystem
- * @param formatClass The {@code FileInputFormat} implementation
- * @param keyClass The {@code Writable} to use for the key
- * @param valueClass The {@code Writable} to use for the value
- * @return A new {@code TableSource<K, V>} instance
- */
- public static <K extends Writable, V extends Writable> TableSource<K, V> formattedFile(
- String pathName, Class<? extends FileInputFormat<K, V>> formatClass,
- Class<K> keyClass, Class<V> valueClass) {
- return formattedFile(new Path(pathName), formatClass, keyClass, valueClass);
- }
-
- /**
- * Creates a {@code TableSource<K, V>} for reading data from files that have custom
- * {@code FileInputFormat<K, V>} implementations not covered by the provided {@code TableSource}
- * and {@code Source} factory methods.
- *
- * @param The {@code Path} to the data
- * @param formatClass The {@code FileInputFormat} implementation
- * @param keyClass The {@code Writable} to use for the key
- * @param valueClass The {@code Writable} to use for the value
- * @return A new {@code TableSource<K, V>} instance
- */
- public static <K extends Writable, V extends Writable> TableSource<K, V> formattedFile(
- Path path, Class<? extends FileInputFormat<K, V>> formatClass,
- Class<K> keyClass, Class<V> valueClass) {
- return formattedFile(path, formatClass, Writables.writables(keyClass),
- Writables.writables(valueClass));
- }
-
- /**
- * Creates a {@code TableSource<K, V>} for reading data from files that have custom
- * {@code FileInputFormat} implementations not covered by the provided {@code TableSource}
- * and {@code Source} factory methods.
- *
- * @param pathName The name of the path to the data on the filesystem
- * @param formatClass The {@code FileInputFormat} implementation
- * @param keyType The {@code PType} to use for the key
- * @param valueType The {@code PType} to use for the value
- * @return A new {@code TableSource<K, V>} instance
- */
- public static <K, V> TableSource<K, V> formattedFile(String pathName,
- Class<? extends FileInputFormat<?, ?>> formatClass,
- PType<K> keyType, PType<V> valueType) {
- return formattedFile(new Path(pathName), formatClass, keyType, valueType);
- }
-
- /**
- * Creates a {@code TableSource<K, V>} for reading data from files that have custom
- * {@code FileInputFormat} implementations not covered by the provided {@code TableSource}
- * and {@code Source} factory methods.
- *
- * @param The {@code Path} to the data
- * @param formatClass The {@code FileInputFormat} implementation
- * @param keyType The {@code PType} to use for the key
- * @param valueType The {@code PType} to use for the value
- * @return A new {@code TableSource<K, V>} instance
- */
- public static <K, V> TableSource<K, V> formattedFile(Path path,
- Class<? extends FileInputFormat<?, ?>> formatClass,
- PType<K> keyType, PType<V> valueType) {
- PTableType<K, V> tableType = keyType.getFamily().tableOf(keyType, valueType);
- return new FileTableSourceImpl<K, V>(path, tableType, formatClass);
- }
-
- /**
- * Creates a {@code Source<T>} instance from the Avro file(s) at the given path name.
- *
- * @param pathName The name of the path to the data on the filesystem
- * @param avroClass The subclass of {@code SpecificRecord} to use for the Avro file
- * @return A new {@code Source<T>} instance
- */
- public static <T extends SpecificRecord> Source<T> avroFile(String pathName, Class<T> avroClass) {
- return avroFile(new Path(pathName), avroClass);
- }
-
- /**
- * Creates a {@code Source<T>} instance from the Avro file(s) at the given {@code Path}.
- *
- * @param path The {@code Path} to the data
- * @param avroClass The subclass of {@code SpecificRecord} to use for the Avro file
- * @return A new {@code Source<T>} instance
- */
- public static <T extends SpecificRecord> Source<T> avroFile(Path path, Class<T> avroClass) {
- return avroFile(path, Avros.specifics(avroClass));
- }
-
- /**
- * Creates a {@code Source<T>} instance from the Avro file(s) at the given path name.
- *
- * @param pathName The name of the path to the data on the filesystem
- * @param avroType The {@code AvroType} for the Avro records
- * @return A new {@code Source<T>} instance
- */
- public static <T> Source<T> avroFile(String pathName, AvroType<T> avroType) {
- return avroFile(new Path(pathName), avroType);
- }
-
- /**
- * Creates a {@code Source<T>} instance from the Avro file(s) at the given {@code Path}.
- *
- * @param path The {@code Path} to the data
- * @param avroType The {@code AvroType} for the Avro records
- * @return A new {@code Source<T>} instance
- */
- public static <T> Source<T> avroFile(Path path, AvroType<T> avroType) {
- return new AvroFileSource<T>(path, avroType);
- }
-
- /**
- * Creates a {@code Source<T>} instance from the SequenceFile(s) at the given path name
- * from the value field of each key-value pair in the SequenceFile(s).
- *
- * @param pathName The name of the path to the data on the filesystem
- * @param valueClass The {@code Writable} type for the value of the SequenceFile entry
- * @return A new {@code Source<T>} instance
- */
- public static <T extends Writable> Source<T> sequenceFile(String pathName, Class<T> valueClass) {
- return sequenceFile(new Path(pathName), valueClass);
- }
-
- /**
- * Creates a {@code Source<T>} instance from the SequenceFile(s) at the given {@code Path}
- * from the value field of each key-value pair in the SequenceFile(s).
- *
- * @param path The {@code Path} to the data
- * @param valueClass The {@code Writable} type for the value of the SequenceFile entry
- * @return A new {@code Source<T>} instance
- */
- public static <T extends Writable> Source<T> sequenceFile(Path path, Class<T> valueClass) {
- return sequenceFile(path, Writables.writables(valueClass));
- }
-
- /**
- * Creates a {@code Source<T>} instance from the SequenceFile(s) at the given path name
- * from the value field of each key-value pair in the SequenceFile(s).
- *
- * @param pathName The name of the path to the data on the filesystem
- * @param ptype The {@code PType} for the value of the SequenceFile entry
- * @return A new {@code Source<T>} instance
- */
- public static <T> Source<T> sequenceFile(String pathName, PType<T> ptype) {
- return sequenceFile(new Path(pathName), ptype);
- }
-
- /**
- * Creates a {@code Source<T>} instance from the SequenceFile(s) at the given {@code Path}
- * from the value field of each key-value pair in the SequenceFile(s).
- *
- * @param path The {@code Path} to the data
- * @param ptype The {@code PType} for the value of the SequenceFile entry
- * @return A new {@code Source<T>} instance
- */
- public static <T> Source<T> sequenceFile(Path path, PType<T> ptype) {
- return new SeqFileSource<T>(path, ptype);
- }
-
- /**
- * Creates a {@code TableSource<K, V>} instance for the SequenceFile(s) at the given path name.
- *
- * @param pathName The name of the path to the data on the filesystem
- * @param keyClass The {@code Writable} subclass for the key of the SequenceFile entry
- * @param valueClass The {@code Writable} subclass for the value of the SequenceFile entry
- * @return A new {@code SourceTable<K, V>} instance
- */
- public static <K extends Writable, V extends Writable> TableSource<K, V> sequenceFile(
- String pathName, Class<K> keyClass, Class<V> valueClass) {
- return sequenceFile(new Path(pathName), keyClass, valueClass);
- }
-
- /**
- * Creates a {@code TableSource<K, V>} instance for the SequenceFile(s) at the given {@code Path}.
- *
- * @param path The {@code Path} to the data
- * @param keyClass The {@code Writable} subclass for the key of the SequenceFile entry
- * @param valueClass The {@code Writable} subclass for the value of the SequenceFile entry
- * @return A new {@code SourceTable<K, V>} instance
- */
- public static <K extends Writable, V extends Writable> TableSource<K, V> sequenceFile(
- Path path, Class<K> keyClass, Class<V> valueClass) {
- return sequenceFile(path, Writables.writables(keyClass), Writables.writables(valueClass));
- }
-
- /**
- * Creates a {@code TableSource<K, V>} instance for the SequenceFile(s) at the given path name.
- *
- * @param pathName The name of the path to the data on the filesystem
- * @param keyType The {@code PType} for the key of the SequenceFile entry
- * @param valueType The {@code PType} for the value of the SequenceFile entry
- * @return A new {@code SourceTable<K, V>} instance
- */
- public static <K, V> TableSource<K, V> sequenceFile(String pathName, PType<K> keyType, PType<V> valueType) {
- return sequenceFile(new Path(pathName), keyType, valueType);
- }
-
- /**
- * Creates a {@code TableSource<K, V>} instance for the SequenceFile(s) at the given {@code Path}.
- *
- * @param path The {@code Path} to the data
- * @param keyType The {@code PType} for the key of the SequenceFile entry
- * @param valueType The {@code PType} for the value of the SequenceFile entry
- * @return A new {@code SourceTable<K, V>} instance
- */
- public static <K, V> TableSource<K, V> sequenceFile(Path path, PType<K> keyType, PType<V> valueType) {
- PTypeFamily ptf = keyType.getFamily();
- return new SeqFileTableSource<K, V>(path, ptf.tableOf(keyType, valueType));
- }
-
- /**
- * Creates a {@code Source<String>} instance for the text file(s) at the given path name.
- *
- * @param pathName The name of the path to the data on the filesystem
- * @return A new {@code Source<String>} instance
- */
- public static Source<String> textFile(String pathName) {
- return textFile(new Path(pathName));
- }
-
- /**
- * Creates a {@code Source<String>} instance for the text file(s) at the given {@code Path}.
- *
- * @param path The {@code Path} to the data
- * @return A new {@code Source<String>} instance
- */
- public static Source<String> textFile(Path path) {
- return textFile(path, Writables.strings());
- }
-
- /**
- * Creates a {@code Source<T>} instance for the text file(s) at the given path name using
- * the provided {@code PType<T>} to convert the input text.
- *
- * @param pathName The name of the path to the data on the filesystem
- * @param ptype The {@code PType<T>} to use to process the input text
- * @return A new {@code Source<T>} instance
- */
- public static <T> Source<T> textFile(String pathName, PType<T> ptype) {
- return textFile(new Path(pathName), ptype);
- }
-
- /**
- * Creates a {@code Source<T>} instance for the text file(s) at the given {@code Path} using
- * the provided {@code PType<T>} to convert the input text.
- *
- * @param path The {@code Path} to the data
- * @param ptype The {@code PType<T>} to use to process the input text
- * @return A new {@code Source<T>} instance
- */
- public static <T> Source<T> textFile(Path path, PType<T> ptype) {
- return new TextFileSource<T>(path, ptype);
- }
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/io/MapReduceTarget.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/MapReduceTarget.java b/crunch/src/main/java/org/apache/crunch/io/MapReduceTarget.java
deleted file mode 100644
index b484103..0000000
--- a/crunch/src/main/java/org/apache/crunch/io/MapReduceTarget.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.io;
-
-import org.apache.crunch.Target;
-import org.apache.crunch.types.PType;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.Job;
-
-public interface MapReduceTarget extends Target {
- void configureForMapReduce(Job job, PType<?> ptype, Path outputPath, String name);
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/io/OutputHandler.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/OutputHandler.java b/crunch/src/main/java/org/apache/crunch/io/OutputHandler.java
deleted file mode 100644
index 01d7f99..0000000
--- a/crunch/src/main/java/org/apache/crunch/io/OutputHandler.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.io;
-
-import org.apache.crunch.Target;
-import org.apache.crunch.types.PType;
-
-public interface OutputHandler {
- boolean configure(Target target, PType<?> ptype);
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/io/PathTarget.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/PathTarget.java b/crunch/src/main/java/org/apache/crunch/io/PathTarget.java
deleted file mode 100644
index 7a35209..0000000
--- a/crunch/src/main/java/org/apache/crunch/io/PathTarget.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.io;
-
-import org.apache.hadoop.fs.Path;
-
-/**
- * A target whose output goes to a given path on a file system.
- */
-public interface PathTarget extends MapReduceTarget {
-
- Path getPath();
-
- /**
- * Get the naming scheme to be used for outputs being written to an output
- * path.
- *
- * @return the naming scheme to be used
- */
- FileNamingScheme getFileNamingScheme();
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/io/PathTargetImpl.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/PathTargetImpl.java b/crunch/src/main/java/org/apache/crunch/io/PathTargetImpl.java
deleted file mode 100644
index 0be3f9a..0000000
--- a/crunch/src/main/java/org/apache/crunch/io/PathTargetImpl.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.io;
-
-import org.apache.crunch.types.PType;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-
-public abstract class PathTargetImpl implements PathTarget {
-
- private final Path path;
- private final Class<OutputFormat> outputFormatClass;
- private final Class keyClass;
- private final Class valueClass;
-
- public PathTargetImpl(String path, Class<OutputFormat> outputFormatClass, Class keyClass, Class valueClass) {
- this(new Path(path), outputFormatClass, keyClass, valueClass);
- }
-
- public PathTargetImpl(Path path, Class<OutputFormat> outputFormatClass, Class keyClass, Class valueClass) {
- this.path = path;
- this.outputFormatClass = outputFormatClass;
- this.keyClass = keyClass;
- this.valueClass = valueClass;
- }
-
- @Override
- public void configureForMapReduce(Job job, PType<?> ptype, Path outputPath, String name) {
- try {
- FileOutputFormat.setOutputPath(job, path);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- if (name == null) {
- job.setOutputFormatClass(outputFormatClass);
- job.setOutputKeyClass(keyClass);
- job.setOutputValueClass(valueClass);
- } else {
- CrunchOutputs.addNamedOutput(job, name, outputFormatClass, keyClass, valueClass);
- }
- }
-
- @Override
- public Path getPath() {
- return path;
- }
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/io/ReadableSource.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/ReadableSource.java b/crunch/src/main/java/org/apache/crunch/io/ReadableSource.java
deleted file mode 100644
index 0407167..0000000
--- a/crunch/src/main/java/org/apache/crunch/io/ReadableSource.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.io;
-
-import java.io.IOException;
-
-import org.apache.crunch.Source;
-import org.apache.hadoop.conf.Configuration;
-
-/**
- * An extension of the {@code Source} interface that indicates that a
- * {@code Source} instance may be read as a series of records by the client
- * code. This is used to determine whether a {@code PCollection} instance can be
- * materialized.
- */
-public interface ReadableSource<T> extends Source<T> {
-
- /**
- * Returns an {@code Iterable} that contains the contents of this source.
- *
- * @param conf The current {@code Configuration} instance
- * @return the contents of this {@code Source} as an {@code Iterable} instance
- * @throws IOException
- */
- Iterable<T> read(Configuration conf) throws IOException;
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/io/ReadableSourceTarget.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/ReadableSourceTarget.java b/crunch/src/main/java/org/apache/crunch/io/ReadableSourceTarget.java
deleted file mode 100644
index 95c90aa..0000000
--- a/crunch/src/main/java/org/apache/crunch/io/ReadableSourceTarget.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.io;
-
-import org.apache.crunch.SourceTarget;
-
-/**
- * An interface that indicates that a {@code SourceTarget} instance can be read
- * into the local client.
- *
- * @param <T>
- * The type of data read.
- */
-public interface ReadableSourceTarget<T> extends ReadableSource<T>, SourceTarget<T> {
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/io/SequentialFileNamingScheme.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/SequentialFileNamingScheme.java b/crunch/src/main/java/org/apache/crunch/io/SequentialFileNamingScheme.java
deleted file mode 100644
index bdda8e6..0000000
--- a/crunch/src/main/java/org/apache/crunch/io/SequentialFileNamingScheme.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.io;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-/**
- * Default {@link FileNamingScheme} that uses an incrementing sequence number in
- * order to generate unique file names.
- */
-public class SequentialFileNamingScheme implements FileNamingScheme {
-
- @Override
- public String getMapOutputName(Configuration configuration, Path outputDirectory) throws IOException {
- return getSequentialFileName(configuration, outputDirectory, "m");
- }
-
- @Override
- public String getReduceOutputName(Configuration configuration, Path outputDirectory, int partitionId)
- throws IOException {
- return getSequentialFileName(configuration, outputDirectory, "r");
- }
-
- private String getSequentialFileName(Configuration configuration, Path outputDirectory, String jobTypeName)
- throws IOException {
- FileSystem fileSystem = outputDirectory.getFileSystem(configuration);
- int fileSequenceNumber = fileSystem.listStatus(outputDirectory).length;
-
- return String.format("part-%s-%05d", jobTypeName, fileSequenceNumber);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/io/SourceTargetHelper.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/SourceTargetHelper.java b/crunch/src/main/java/org/apache/crunch/io/SourceTargetHelper.java
deleted file mode 100644
index f4400de..0000000
--- a/crunch/src/main/java/org/apache/crunch/io/SourceTargetHelper.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.io;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-/**
- * Functions for configuring the inputs/outputs of MapReduce jobs.
- *
- */
-public class SourceTargetHelper {
-
- public static long getPathSize(Configuration conf, Path path) throws IOException {
- return getPathSize(path.getFileSystem(conf), path);
- }
-
- public static long getPathSize(FileSystem fs, Path path) throws IOException {
- FileStatus[] stati = fs.globStatus(path);
- if (stati == null || stati.length == 0) {
- return -1L;
- }
- long size = 0;
- for (FileStatus status : stati) {
- size += fs.getContentSummary(status.getPath()).getLength();
- }
- return size;
- }
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/io/To.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/To.java b/crunch/src/main/java/org/apache/crunch/io/To.java
deleted file mode 100644
index d62d294..0000000
--- a/crunch/src/main/java/org/apache/crunch/io/To.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.io;
-
-import org.apache.crunch.Target;
-import org.apache.crunch.io.avro.AvroFileTarget;
-import org.apache.crunch.io.impl.FileTargetImpl;
-import org.apache.crunch.io.seq.SeqFileTarget;
-import org.apache.crunch.io.text.TextFileTarget;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-
-/**
- * <p>Static factory methods for creating common {@link Target} types.</p>
- *
- * <p>The {@code To} class is intended to be used as part of a literate API
- * for writing the output of Crunch pipelines to common file types. We can use
- * the {@code Target} objects created by the factory methods in the {@code To}
- * class with either the {@code write} method on the {@code Pipeline} class or
- * the convenience {@code write} method on {@code PCollection} and {@code PTable}
- * instances.
- *
- * <code>
- * Pipeline pipeline = new MRPipeline(this.getClass());
- * ...
- * // Write a PCollection<String> to a text file:
- * PCollection<String> words = ...;
- * pipeline.write(words, To.textFile("/put/my/words/here"));
- *
- * // Write a PTable<Text, Text> to a sequence file:
- * PTable<Text, Text> textToText = ...;
- * textToText.write(To.sequenceFile("/words/to/words"));
- *
- * // Write a PCollection<MyAvroObject> to an Avro data file:
- * PCollection<MyAvroObject> objects = ...;
- * objects.write(To.avroFile("/my/avro/files"));
- *
- * // Write a PTable to a custom FileOutputFormat:
- * PTable<KeyWritable, ValueWritable> custom = ...;
- * pipeline.write(custom, To.formattedFile("/custom", MyFileFormat.class));
- * </code>
- * </p>
- */
-public class To {
-
- /**
- * Creates a {@code Target} at the given path name that writes data to
- * a custom {@code FileOutputFormat}.
- *
- * @param pathName The name of the path to write the data to on the filesystem
- * @param formatClass The {@code FileOutputFormat<K, V>} to write the data to
- * @return A new {@code Target} instance
- */
- public static <K extends Writable, V extends Writable> Target formattedFile(
- String pathName, Class<? extends FileOutputFormat<K, V>> formatClass) {
- return formattedFile(new Path(pathName), formatClass);
- }
-
- /**
- * Creates a {@code Target} at the given {@code Path} that writes data to
- * a custom {@code FileOutputFormat}.
- *
- * @param path The {@code Path} to write the data to
- * @param formatClass The {@code FileOutputFormat} to write the data to
- * @return A new {@code Target} instance
- */
- public static <K extends Writable, V extends Writable> Target formattedFile(
- Path path, Class<? extends FileOutputFormat<K, V>> formatClass) {
- return new FileTargetImpl(path, formatClass, new SequentialFileNamingScheme());
- }
-
- /**
- * Creates a {@code Target} at the given path name that writes data to
- * Avro files. The {@code PType} for the written data must be for Avro records.
- *
- * @param pathName The name of the path to write the data to on the filesystem
- * @return A new {@code Target} instance
- */
- public static Target avroFile(String pathName) {
- return avroFile(new Path(pathName));
- }
-
- /**
- * Creates a {@code Target} at the given {@code Path} that writes data to
- * Avro files. The {@code PType} for the written data must be for Avro records.
- *
- * @param path The {@code Path} to write the data to
- * @return A new {@code Target} instance
- */
- public static Target avroFile(Path path) {
- return new AvroFileTarget(path);
- }
-
- /**
- * Creates a {@code Target} at the given path name that writes data to
- * SequenceFiles.
- *
- * @param pathName The name of the path to write the data to on the filesystem
- * @return A new {@code Target} instance
- */
- public static Target sequenceFile(String pathName) {
- return sequenceFile(new Path(pathName));
- }
-
- /**
- * Creates a {@code Target} at the given {@code Path} that writes data to
- * SequenceFiles.
- *
- * @param path The {@code Path} to write the data to
- * @return A new {@code Target} instance
- */
- public static Target sequenceFile(Path path) {
- return new SeqFileTarget(path);
- }
-
- /**
- * Creates a {@code Target} at the given path name that writes data to
- * text files.
- *
- * @param pathName The name of the path to write the data to on the filesystem
- * @return A new {@code Target} instance
- */
- public static Target textFile(String pathName) {
- return textFile(new Path(pathName));
- }
-
- /**
- * Creates a {@code Target} at the given {@code Path} that writes data to
- * text files.
- *
- * @param path The {@code Path} to write the data to
- * @return A new {@code Target} instance
- */
- public static Target textFile(Path path) {
- return new TextFileTarget(path);
- }
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileReaderFactory.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileReaderFactory.java b/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileReaderFactory.java
deleted file mode 100644
index c8fe23a..0000000
--- a/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileReaderFactory.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.io.avro;
-
-import java.io.IOException;
-import java.util.Iterator;
-
-import org.apache.avro.Schema;
-import org.apache.avro.file.DataFileReader;
-import org.apache.avro.generic.GenericDatumReader;
-import org.apache.avro.io.DatumReader;
-import org.apache.avro.mapred.FsInput;
-import org.apache.avro.reflect.ReflectDatumReader;
-import org.apache.avro.specific.SpecificDatumReader;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.crunch.MapFn;
-import org.apache.crunch.fn.IdentityFn;
-import org.apache.crunch.io.FileReaderFactory;
-import org.apache.crunch.io.impl.AutoClosingIterator;
-import org.apache.crunch.types.avro.AvroType;
-import org.apache.crunch.types.avro.Avros;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-import com.google.common.collect.Iterators;
-import com.google.common.collect.UnmodifiableIterator;
-
-public class AvroFileReaderFactory<T> implements FileReaderFactory<T> {
-
- private static final Log LOG = LogFactory.getLog(AvroFileReaderFactory.class);
-
- private final DatumReader<T> recordReader;
- private final MapFn<T, T> mapFn;
-
- public AvroFileReaderFactory(AvroType<T> atype) {
- this.recordReader = createDatumReader(atype);
- this.mapFn = (MapFn<T, T>) atype.getInputMapFn();
- }
-
- public AvroFileReaderFactory(Schema schema) {
- this.recordReader = new GenericDatumReader<T>(schema);
- this.mapFn = IdentityFn.<T>getInstance();
- }
-
- static <T> DatumReader<T> createDatumReader(AvroType<T> avroType) {
- if (avroType.hasReflect()) {
- if (avroType.hasSpecific()) {
- Avros.checkCombiningSpecificAndReflectionSchemas();
- }
- return new ReflectDatumReader<T>(avroType.getSchema());
- } else if (avroType.hasSpecific()) {
- return new SpecificDatumReader<T>(avroType.getSchema());
- } else {
- return new GenericDatumReader<T>(avroType.getSchema());
- }
- }
-
- @Override
- public Iterator<T> read(FileSystem fs, final Path path) {
- this.mapFn.initialize();
- try {
- FsInput fsi = new FsInput(path, fs.getConf());
- final DataFileReader<T> reader = new DataFileReader<T>(fsi, recordReader);
- return new AutoClosingIterator<T>(reader, new UnmodifiableIterator<T>() {
- @Override
- public boolean hasNext() {
- return reader.hasNext();
- }
-
- @Override
- public T next() {
- return mapFn.map(reader.next());
- }
- });
- } catch (IOException e) {
- LOG.info("Could not read avro file at path: " + path, e);
- return Iterators.emptyIterator();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileSource.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileSource.java b/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileSource.java
deleted file mode 100644
index 15792bf..0000000
--- a/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileSource.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.io.avro;
-
-import java.io.IOException;
-
-import org.apache.avro.mapred.AvroJob;
-import org.apache.crunch.io.CompositePathIterable;
-import org.apache.crunch.io.FormatBundle;
-import org.apache.crunch.io.ReadableSource;
-import org.apache.crunch.io.impl.FileSourceImpl;
-import org.apache.crunch.types.avro.AvroInputFormat;
-import org.apache.crunch.types.avro.AvroType;
-import org.apache.crunch.types.avro.Avros;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-public class AvroFileSource<T> extends FileSourceImpl<T> implements ReadableSource<T> {
-
- private static <S> FormatBundle getBundle(AvroType<S> ptype) {
- FormatBundle bundle = FormatBundle.forInput(AvroInputFormat.class)
- .set(AvroJob.INPUT_IS_REFLECT, String.valueOf(ptype.hasReflect()))
- .set(AvroJob.INPUT_SCHEMA, ptype.getSchema().toString())
- .set(Avros.REFLECT_DATA_FACTORY_CLASS, Avros.REFLECT_DATA_FACTORY.getClass().getName());
- return bundle;
- }
-
- public AvroFileSource(Path path, AvroType<T> ptype) {
- super(path, ptype, getBundle(ptype));
- }
-
- @Override
- public String toString() {
- return "Avro(" + path.toString() + ")";
- }
-
- @Override
- public Iterable<T> read(Configuration conf) throws IOException {
- FileSystem fs = path.getFileSystem(conf);
- return CompositePathIterable.create(fs, path, new AvroFileReaderFactory<T>((AvroType<T>) ptype));
- }
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileSourceTarget.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileSourceTarget.java b/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileSourceTarget.java
deleted file mode 100644
index 76103e5..0000000
--- a/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileSourceTarget.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.io.avro;
-
-import org.apache.crunch.io.FileNamingScheme;
-import org.apache.crunch.io.SequentialFileNamingScheme;
-import org.apache.crunch.io.impl.ReadableSourcePathTargetImpl;
-import org.apache.crunch.types.avro.AvroType;
-import org.apache.hadoop.fs.Path;
-
-public class AvroFileSourceTarget<T> extends ReadableSourcePathTargetImpl<T> {
- public AvroFileSourceTarget(Path path, AvroType<T> atype) {
- this(path, atype, new SequentialFileNamingScheme());
- }
-
- public AvroFileSourceTarget(Path path, AvroType<T> atype, FileNamingScheme fileNamingScheme) {
- super(new AvroFileSource<T>(path, atype), new AvroFileTarget(path), fileNamingScheme);
- }
-
- @Override
- public String toString() {
- return target.toString();
- }
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileTarget.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileTarget.java b/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileTarget.java
deleted file mode 100644
index 3a9e42c..0000000
--- a/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileTarget.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.io.avro;
-
-import org.apache.avro.mapred.AvroWrapper;
-import org.apache.crunch.SourceTarget;
-import org.apache.crunch.io.FileNamingScheme;
-import org.apache.crunch.io.OutputHandler;
-import org.apache.crunch.io.SequentialFileNamingScheme;
-import org.apache.crunch.io.impl.FileTargetImpl;
-import org.apache.crunch.types.PType;
-import org.apache.crunch.types.avro.AvroOutputFormat;
-import org.apache.crunch.types.avro.AvroType;
-import org.apache.crunch.types.avro.Avros;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.Job;
-
-public class AvroFileTarget extends FileTargetImpl {
-
- public AvroFileTarget(String path) {
- this(new Path(path));
- }
-
- public AvroFileTarget(Path path) {
- this(path, new SequentialFileNamingScheme());
- }
-
- public AvroFileTarget(Path path, FileNamingScheme fileNamingScheme) {
- super(path, AvroOutputFormat.class, fileNamingScheme);
- }
-
- @Override
- public String toString() {
- return "Avro(" + path.toString() + ")";
- }
-
- @Override
- public boolean accept(OutputHandler handler, PType<?> ptype) {
- if (!(ptype instanceof AvroType)) {
- return false;
- }
- handler.configure(this, ptype);
- return true;
- }
-
- @Override
- public void configureForMapReduce(Job job, PType<?> ptype, Path outputPath, String name) {
- AvroType<?> atype = (AvroType<?>) ptype;
- Configuration conf = job.getConfiguration();
- String schemaParam = null;
- if (name == null) {
- schemaParam = "avro.output.schema";
- } else {
- schemaParam = "avro.output.schema." + name;
- }
- String outputSchema = conf.get(schemaParam);
- if (outputSchema == null) {
- conf.set(schemaParam, atype.getSchema().toString());
- } else if (!outputSchema.equals(atype.getSchema().toString())) {
- throw new IllegalStateException("Avro targets must use the same output schema");
- }
- Avros.configureReflectDataFactory(conf);
- configureForMapReduce(job, AvroWrapper.class, NullWritable.class, AvroOutputFormat.class,
- outputPath, name);
- }
-
- @Override
- public <T> SourceTarget<T> asSourceTarget(PType<T> ptype) {
- if (ptype instanceof AvroType) {
- return new AvroFileSourceTarget<T>(path, (AvroType<T>) ptype);
- }
- return null;
- }
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/io/impl/AutoClosingIterator.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/impl/AutoClosingIterator.java b/crunch/src/main/java/org/apache/crunch/io/impl/AutoClosingIterator.java
deleted file mode 100644
index 3bd802e..0000000
--- a/crunch/src/main/java/org/apache/crunch/io/impl/AutoClosingIterator.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.io.impl;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.Iterator;
-
-import com.google.common.collect.UnmodifiableIterator;
-import com.google.common.io.Closeables;
-
-/**
- * Closes the wrapped {@code Closeable} when {@link #hasNext()} returns false. As long a client loops through to
- * completion (doesn't abort early due to an exception, short circuit, etc.) resources will be closed automatically.
- */
-public class AutoClosingIterator<T> extends UnmodifiableIterator<T> implements Closeable {
- private final Iterator<T> iter;
- private Closeable closeable;
-
- public AutoClosingIterator(Closeable closeable, Iterator<T> iter) {
- this.closeable = closeable;
- this.iter = iter;
- }
-
- @Override
- public boolean hasNext() {
- if (!iter.hasNext()) {
- Closeables.closeQuietly(this);
- return false;
- } else {
- return true;
- }
- }
-
- @Override
- public T next() {
- return iter.next();
- }
-
- @Override
- public void close() throws IOException {
- if (closeable != null) {
- closeable.close();
- closeable = null;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java b/crunch/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java
deleted file mode 100644
index 688c801..0000000
--- a/crunch/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.io.impl;
-
-import java.io.IOException;
-
-import org.apache.commons.lang.builder.HashCodeBuilder;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.crunch.Source;
-import org.apache.crunch.io.CrunchInputs;
-import org.apache.crunch.io.FormatBundle;
-import org.apache.crunch.io.SourceTargetHelper;
-import org.apache.crunch.types.PType;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-
-public class FileSourceImpl<T> implements Source<T> {
-
- private static final Log LOG = LogFactory.getLog(FileSourceImpl.class);
-
- protected final Path path;
- protected final PType<T> ptype;
- protected final FormatBundle<? extends InputFormat> inputBundle;
-
- public FileSourceImpl(Path path, PType<T> ptype, Class<? extends InputFormat> inputFormatClass) {
- this.path = path;
- this.ptype = ptype;
- this.inputBundle = FormatBundle.forInput(inputFormatClass);
- }
-
- public FileSourceImpl(Path path, PType<T> ptype, FormatBundle<? extends InputFormat> inputBundle) {
- this.path = path;
- this.ptype = ptype;
- this.inputBundle = inputBundle;
- }
-
- public Path getPath() {
- return path;
- }
-
- @Override
- public void configureSource(Job job, int inputId) throws IOException {
- if (inputId == -1) {
- FileInputFormat.addInputPath(job, path);
- job.setInputFormatClass(inputBundle.getFormatClass());
- inputBundle.configure(job.getConfiguration());
- } else {
- CrunchInputs.addInputPath(job, path, inputBundle, inputId);
- }
- }
-
- @Override
- public PType<T> getType() {
- return ptype;
- }
-
- @Override
- public long getSize(Configuration configuration) {
- try {
- return SourceTargetHelper.getPathSize(configuration, path);
- } catch (IOException e) {
- LOG.warn(String.format("Exception thrown looking up size of: %s", path), e);
- throw new IllegalStateException("Failed to get the file size of:" + path, e);
- }
- }
-
- @Override
- public boolean equals(Object other) {
- if (other == null || !getClass().equals(other.getClass())) {
- return false;
- }
- FileSourceImpl o = (FileSourceImpl) other;
- return ptype.equals(o.ptype) && path.equals(o.path) && inputBundle.equals(o.inputBundle);
- }
-
- @Override
- public int hashCode() {
- return new HashCodeBuilder().append(ptype).append(path).append(inputBundle).toHashCode();
- }
-
- @Override
- public String toString() {
- return new StringBuilder().append(inputBundle.getName()).append("(").append(path).append(")").toString();
- }
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/io/impl/FileTableSourceImpl.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/impl/FileTableSourceImpl.java b/crunch/src/main/java/org/apache/crunch/io/impl/FileTableSourceImpl.java
deleted file mode 100644
index 295edb5..0000000
--- a/crunch/src/main/java/org/apache/crunch/io/impl/FileTableSourceImpl.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.io.impl;
-
-import org.apache.crunch.Pair;
-import org.apache.crunch.TableSource;
-import org.apache.crunch.io.FormatBundle;
-import org.apache.crunch.types.PTableType;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-
-public class FileTableSourceImpl<K, V> extends FileSourceImpl<Pair<K, V>> implements TableSource<K, V> {
-
- public FileTableSourceImpl(Path path, PTableType<K, V> tableType, Class<? extends FileInputFormat> formatClass) {
- super(path, tableType, formatClass);
- }
-
- public FileTableSourceImpl(Path path, PTableType<K, V> tableType, FormatBundle bundle) {
- super(path, tableType, bundle);
- }
-
- @Override
- public PTableType<K, V> getTableType() {
- return (PTableType<K, V>) getType();
- }
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java b/crunch/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java
deleted file mode 100644
index c1c29e4..0000000
--- a/crunch/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java
+++ /dev/null
@@ -1,162 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.io.impl;
-
-import java.io.IOException;
-
-import org.apache.commons.lang.builder.HashCodeBuilder;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.crunch.CrunchRuntimeException;
-import org.apache.crunch.SourceTarget;
-import org.apache.crunch.io.CrunchOutputs;
-import org.apache.crunch.io.FileNamingScheme;
-import org.apache.crunch.io.OutputHandler;
-import org.apache.crunch.io.PathTarget;
-import org.apache.crunch.types.Converter;
-import org.apache.crunch.types.PType;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-
-public class FileTargetImpl implements PathTarget {
-
- private static final Log LOG = LogFactory.getLog(FileTargetImpl.class);
-
- protected final Path path;
- private final Class<? extends FileOutputFormat> outputFormatClass;
- private final FileNamingScheme fileNamingScheme;
-
- public FileTargetImpl(Path path, Class<? extends FileOutputFormat> outputFormatClass,
- FileNamingScheme fileNamingScheme) {
- this.path = path;
- this.outputFormatClass = outputFormatClass;
- this.fileNamingScheme = fileNamingScheme;
- }
-
- @Override
- public void configureForMapReduce(Job job, PType<?> ptype, Path outputPath, String name) {
- Converter converter = ptype.getConverter();
- Class keyClass = converter.getKeyClass();
- Class valueClass = converter.getValueClass();
- configureForMapReduce(job, keyClass, valueClass, outputFormatClass, outputPath, name);
- }
-
- protected void configureForMapReduce(Job job, Class keyClass, Class valueClass,
- Class outputFormatClass, Path outputPath, String name) {
- try {
- FileOutputFormat.setOutputPath(job, outputPath);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- if (name == null) {
- job.setOutputFormatClass(outputFormatClass);
- job.setOutputKeyClass(keyClass);
- job.setOutputValueClass(valueClass);
- } else {
- CrunchOutputs.addNamedOutput(job, name, outputFormatClass, keyClass, valueClass);
- }
- }
-
- @Override
- public boolean accept(OutputHandler handler, PType<?> ptype) {
- handler.configure(this, ptype);
- return true;
- }
-
- @Override
- public Path getPath() {
- return path;
- }
-
- @Override
- public FileNamingScheme getFileNamingScheme() {
- return fileNamingScheme;
- }
-
- @Override
- public boolean equals(Object other) {
- if (other == null || !getClass().equals(other.getClass())) {
- return false;
- }
- FileTargetImpl o = (FileTargetImpl) other;
- return path.equals(o.path);
- }
-
- @Override
- public int hashCode() {
- return new HashCodeBuilder().append(path).toHashCode();
- }
-
- @Override
- public String toString() {
- return new StringBuilder().append(outputFormatClass.getSimpleName()).append("(").append(path).append(")")
- .toString();
- }
-
- @Override
- public <T> SourceTarget<T> asSourceTarget(PType<T> ptype) {
- // By default, assume that we cannot do this.
- return null;
- }
-
- @Override
- public void handleExisting(WriteMode strategy, Configuration conf) {
- FileSystem fs = null;
- try {
- fs = FileSystem.get(conf);
- } catch (IOException e) {
- LOG.error("Could not retrieve FileSystem object to check for existing path", e);
- throw new CrunchRuntimeException(e);
- }
-
- boolean exists = false;
- try {
- exists = fs.exists(path);
- } catch (IOException e) {
- LOG.error("Exception checking existence of path: " + path, e);
- throw new CrunchRuntimeException(e);
- }
-
- if (exists) {
- switch (strategy) {
- case DEFAULT:
- LOG.error("Path " + path + " already exists!");
- throw new CrunchRuntimeException("Path already exists: " + path);
- case OVERWRITE:
- LOG.info("Removing data at existing path: " + path);
- try {
- fs.delete(path, true);
- } catch (IOException e) {
- LOG.error("Exception thrown removing data at path: " + path, e);
- }
- break;
- case APPEND:
- LOG.info("Adding output files to existing path: " + path);
- break;
- default:
- throw new CrunchRuntimeException("Unknown WriteMode: " + strategy);
- }
- } else {
- LOG.info("Will write output files to new path: " + path);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/io/impl/ReadableSourcePathTargetImpl.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/impl/ReadableSourcePathTargetImpl.java b/crunch/src/main/java/org/apache/crunch/io/impl/ReadableSourcePathTargetImpl.java
deleted file mode 100644
index 6506816..0000000
--- a/crunch/src/main/java/org/apache/crunch/io/impl/ReadableSourcePathTargetImpl.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.io.impl;
-
-import java.io.IOException;
-
-import org.apache.crunch.io.FileNamingScheme;
-import org.apache.crunch.io.PathTarget;
-import org.apache.crunch.io.ReadableSource;
-import org.apache.crunch.io.ReadableSourceTarget;
-import org.apache.hadoop.conf.Configuration;
-
-public class ReadableSourcePathTargetImpl<T> extends SourcePathTargetImpl<T> implements ReadableSourceTarget<T> {
-
- public ReadableSourcePathTargetImpl(ReadableSource<T> source, PathTarget target, FileNamingScheme fileNamingScheme) {
- super(source, target, fileNamingScheme);
- }
-
- @Override
- public Iterable<T> read(Configuration conf) throws IOException {
- return ((ReadableSource<T>) source).read(conf);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/io/impl/ReadableSourceTargetImpl.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/impl/ReadableSourceTargetImpl.java b/crunch/src/main/java/org/apache/crunch/io/impl/ReadableSourceTargetImpl.java
deleted file mode 100644
index f435b3b..0000000
--- a/crunch/src/main/java/org/apache/crunch/io/impl/ReadableSourceTargetImpl.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.io.impl;
-
-import java.io.IOException;
-
-import org.apache.crunch.Target;
-import org.apache.crunch.io.ReadableSource;
-import org.apache.crunch.io.ReadableSourceTarget;
-import org.apache.hadoop.conf.Configuration;
-
-public class ReadableSourceTargetImpl<T> extends SourceTargetImpl<T> implements ReadableSourceTarget<T> {
-
- public ReadableSourceTargetImpl(ReadableSource<T> source, Target target) {
- super(source, target);
- }
-
- @Override
- public Iterable<T> read(Configuration conf) throws IOException {
- return ((ReadableSource<T>) source).read(conf);
- }
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/io/impl/SourcePathTargetImpl.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/impl/SourcePathTargetImpl.java b/crunch/src/main/java/org/apache/crunch/io/impl/SourcePathTargetImpl.java
deleted file mode 100644
index c0d7ce0..0000000
--- a/crunch/src/main/java/org/apache/crunch/io/impl/SourcePathTargetImpl.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.io.impl;
-
-import org.apache.crunch.Source;
-import org.apache.crunch.io.FileNamingScheme;
-import org.apache.crunch.io.PathTarget;
-import org.apache.crunch.types.PType;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.Job;
-
-public class SourcePathTargetImpl<T> extends SourceTargetImpl<T> implements PathTarget {
-
- private final FileNamingScheme fileNamingScheme;
-
- public SourcePathTargetImpl(Source<T> source, PathTarget target, FileNamingScheme fileNamingScheme) {
- super(source, target);
- this.fileNamingScheme = fileNamingScheme;
- }
-
- @Override
- public void configureForMapReduce(Job job, PType<?> ptype, Path outputPath, String name) {
- ((PathTarget) target).configureForMapReduce(job, ptype, outputPath, name);
- }
-
- @Override
- public Path getPath() {
- return ((PathTarget) target).getPath();
- }
-
- @Override
- public FileNamingScheme getFileNamingScheme() {
- return fileNamingScheme;
- }
-}