You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2013/10/07 08:47:37 UTC
[2/2] git commit: CRUNCH-275: Support extra config args on Source,
Target, and SourceTarget
CRUNCH-275: Support extra config args on Source, Target, and SourceTarget
Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/d2a979ca
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/d2a979ca
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/d2a979ca
Branch: refs/heads/master
Commit: d2a979ca6a3cb95e2394f2ba901ca1874ffc49fa
Parents: 9b5e108
Author: Josh Wills <jw...@apache.org>
Authored: Sun Oct 6 15:29:44 2013 -0700
Committer: Josh Wills <jw...@apache.org>
Committed: Sun Oct 6 23:30:28 2013 -0700
----------------------------------------------------------------------
.../crunch/contrib/io/jdbc/DataBaseSource.java | 98 +++++++++-----------
.../src/main/java/org/apache/crunch/Source.java | 8 ++
.../java/org/apache/crunch/SourceTarget.java | 6 ++
.../src/main/java/org/apache/crunch/Target.java | 7 ++
.../java/org/apache/crunch/io/FormatBundle.java | 4 +-
.../apache/crunch/io/avro/AvroFileTarget.java | 14 +--
.../crunch/io/avro/trevni/TrevniKeyTarget.java | 2 +-
.../apache/crunch/io/impl/FileSourceImpl.java | 8 +-
.../apache/crunch/io/impl/FileTargetImpl.java | 45 +++++++--
.../apache/crunch/io/impl/SourceTargetImpl.java | 19 ++++
.../apache/crunch/io/text/TextFileTarget.java | 3 +-
.../crunch/io/hbase/HBaseSourceTarget.java | 15 +++
.../org/apache/crunch/io/hbase/HBaseTarget.java | 16 ++++
.../org/apache/crunch/io/hbase/HFileTarget.java | 31 +------
14 files changed, 172 insertions(+), 104 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/crunch/blob/d2a979ca/crunch-contrib/src/main/java/org/apache/crunch/contrib/io/jdbc/DataBaseSource.java
----------------------------------------------------------------------
diff --git a/crunch-contrib/src/main/java/org/apache/crunch/contrib/io/jdbc/DataBaseSource.java b/crunch-contrib/src/main/java/org/apache/crunch/contrib/io/jdbc/DataBaseSource.java
index 337ecb7..2c51b84 100644
--- a/crunch-contrib/src/main/java/org/apache/crunch/contrib/io/jdbc/DataBaseSource.java
+++ b/crunch-contrib/src/main/java/org/apache/crunch/contrib/io/jdbc/DataBaseSource.java
@@ -17,19 +17,14 @@
*/
package org.apache.crunch.contrib.io.jdbc;
-import java.io.IOException;
import java.sql.Driver;
-import org.apache.crunch.Source;
-import org.apache.crunch.io.CrunchInputs;
import org.apache.crunch.io.FormatBundle;
-import org.apache.crunch.types.Converter;
-import org.apache.crunch.types.PType;
+import org.apache.crunch.io.impl.FileSourceImpl;
import org.apache.crunch.types.writable.Writables;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
@@ -44,78 +39,83 @@ import org.apache.hadoop.mapreduce.lib.db.DBWritable;
*
* @param <T> The input type of this source
*/
-public class DataBaseSource<T extends DBWritable & Writable> implements Source<T> {
-
- private Class<T> inputClass;
- private PType<T> ptype;
- private String driverClass;
- private String url;
- private String username;
- private String password;
- private String selectClause;
- public String countClause;
-
- private DataBaseSource(Class<T> inputClass) {
- this.inputClass = inputClass;
- this.ptype = Writables.writables(inputClass);
+public class DataBaseSource<T extends DBWritable & Writable> extends FileSourceImpl<T> {
+
+ private DataBaseSource(Class<T> inputClass,
+ String driverClassName,
+ String url,
+ String username,
+ String password,
+ String selectClause,
+ String countClause) {
+ super(
+ new Path("dbsource"),
+ Writables.writables(inputClass),
+ FormatBundle.forInput(DBInputFormat.class)
+ .set(DBConfiguration.DRIVER_CLASS_PROPERTY, driverClassName)
+ .set(DBConfiguration.URL_PROPERTY, url)
+ .set(DBConfiguration.USERNAME_PROPERTY, username)
+ .set(DBConfiguration.PASSWORD_PROPERTY, password)
+ .set(DBConfiguration.INPUT_CLASS_PROPERTY, inputClass.getCanonicalName())
+ .set(DBConfiguration.INPUT_QUERY, selectClause)
+ .set(DBConfiguration.INPUT_COUNT_QUERY, countClause));
}
static class Builder<T extends DBWritable & Writable> {
+ private Class<T> inputClass;
+ private String driverClass;
+ private String url;
+ private String username;
+ private String password;
+ private String selectClause;
+ public String countClause;
+
private DataBaseSource<T> dataBaseSource;
public Builder(Class<T> inputClass) {
- this.dataBaseSource = new DataBaseSource<T>(inputClass);
+ this.inputClass = inputClass;
}
Builder<T> setDriverClass(Class<? extends Driver> driverClass) {
- dataBaseSource.driverClass = driverClass.getName();
+ this.driverClass = driverClass.getName();
return this;
}
Builder<T> setUrl(String url) {
- dataBaseSource.url = url;
+ this.url = url;
return this;
}
Builder<T> setUsername(String username) {
- dataBaseSource.username = username;
+ this.username = username;
return this;
}
Builder<T> setPassword(String password) {
- dataBaseSource.password = password;
+ this.password = password;
return this;
}
Builder<T> selectSQLQuery(String selectClause) {
- dataBaseSource.selectClause = selectClause;
+ this.selectClause = selectClause;
return this;
}
Builder<T> countSQLQuery(String countClause) {
- dataBaseSource.countClause = countClause;
+ this.countClause = countClause;
return this;
}
DataBaseSource<T> build() {
- return dataBaseSource;
- }
- }
-
- @Override
- public void configureSource(Job job, int inputId) throws IOException {
- Configuration configuration = job.getConfiguration();
- DBConfiguration.configureDB(configuration, driverClass, url, username, password);
- if (inputId == -1) {
- job.setInputFormatClass(DBInputFormat.class);
- DBInputFormat.setInput(job, inputClass, selectClause, countClause);
- } else {
- FormatBundle<DBInputFormat> bundle = FormatBundle.forInput(DBInputFormat.class)
- .set(DBConfiguration.INPUT_CLASS_PROPERTY, inputClass.getCanonicalName())
- .set(DBConfiguration.INPUT_QUERY, selectClause)
- .set(DBConfiguration.INPUT_COUNT_QUERY, countClause);
- CrunchInputs.addInputPath(job, new Path("dbsource"), bundle, inputId);
+ return new DataBaseSource<T>(
+ inputClass,
+ driverClass,
+ url,
+ username,
+ password,
+ selectClause,
+ countClause);
}
}
@@ -129,14 +129,4 @@ public class DataBaseSource<T extends DBWritable & Writable> implements Source<T
public long getLastModifiedAt(Configuration configuration) {
return -1;
}
-
- @Override
- public PType<T> getType() {
- return ptype;
- }
-
- @Override
- public Converter<?, ?, ?, ?> getConverter() {
- return ptype.getConverter();
- }
}
http://git-wip-us.apache.org/repos/asf/crunch/blob/d2a979ca/crunch-core/src/main/java/org/apache/crunch/Source.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/Source.java b/crunch-core/src/main/java/org/apache/crunch/Source.java
index b0a0449..b209dfc 100644
--- a/crunch-core/src/main/java/org/apache/crunch/Source.java
+++ b/crunch-core/src/main/java/org/apache/crunch/Source.java
@@ -30,6 +30,14 @@ import org.apache.hadoop.mapreduce.Job;
*
*/
public interface Source<T> {
+
+ /**
+ * Adds the given key-value pair to the {@code Configuration} instance that is used to read
+ * this {@code Source<T></T>}. Allows for multiple inputs to re-use the same config keys with
+ * different values when necessary.
+ */
+ Source<T> inputConf(String key, String value);
+
/**
* Returns the {@code PType} for this source.
*/
http://git-wip-us.apache.org/repos/asf/crunch/blob/d2a979ca/crunch-core/src/main/java/org/apache/crunch/SourceTarget.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/SourceTarget.java b/crunch-core/src/main/java/org/apache/crunch/SourceTarget.java
index 09c03c6..80cd730 100644
--- a/crunch-core/src/main/java/org/apache/crunch/SourceTarget.java
+++ b/crunch-core/src/main/java/org/apache/crunch/SourceTarget.java
@@ -23,4 +23,10 @@ package org.apache.crunch;
*
*/
public interface SourceTarget<T> extends Source<T>, Target {
+ /**
+ * Adds the given key-value pair to the {@code Configuration} instance(s) that are used to
+ * read and write this {@code SourceTarget<T>}. Allows for multiple inputs and outputs to
+ * re-use the same config keys with different values when necessary.
+ */
+ SourceTarget<T> conf(String key, String value);
}
http://git-wip-us.apache.org/repos/asf/crunch/blob/d2a979ca/crunch-core/src/main/java/org/apache/crunch/Target.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/Target.java b/crunch-core/src/main/java/org/apache/crunch/Target.java
index 65ad67d..112c637 100644
--- a/crunch-core/src/main/java/org/apache/crunch/Target.java
+++ b/crunch-core/src/main/java/org/apache/crunch/Target.java
@@ -61,6 +61,13 @@ public interface Target {
}
/**
+ * Adds the given key-value pair to the {@code Configuration} instance that is used to write
+ * this {@code Target}. Allows for multiple target outputs to re-use the same config keys with
+ * different values when necessary.
+ */
+ Target outputConf(String key, String value);
+
+ /**
* Apply the given {@code WriteMode} to this {@code Target} instance.
*
* @param writeMode The strategy for handling existing outputs
http://git-wip-us.apache.org/repos/asf/crunch/blob/d2a979ca/crunch-core/src/main/java/org/apache/crunch/io/FormatBundle.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/FormatBundle.java b/crunch-core/src/main/java/org/apache/crunch/io/FormatBundle.java
index 4796006..aa84fee 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/FormatBundle.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/FormatBundle.java
@@ -70,8 +70,8 @@ public class FormatBundle<K> implements Serializable, Writable, Configurable {
return new FormatBundle<T>(inputFormatClass);
}
- public static <T extends OutputFormat<?, ?>> FormatBundle<T> forOutput(Class<T> inputFormatClass) {
- return new FormatBundle<T>(inputFormatClass);
+ public static <T extends OutputFormat<?, ?>> FormatBundle<T> forOutput(Class<T> outputFormatClass) {
+ return new FormatBundle<T>(outputFormatClass);
}
public FormatBundle() {
http://git-wip-us.apache.org/repos/asf/crunch/blob/d2a979ca/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileTarget.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileTarget.java b/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileTarget.java
index ea0179f..fc82361 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileTarget.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileTarget.java
@@ -20,6 +20,7 @@ package org.apache.crunch.io.avro;
import org.apache.avro.mapred.AvroWrapper;
import org.apache.crunch.SourceTarget;
import org.apache.crunch.io.FileNamingScheme;
+import org.apache.crunch.io.FormatBundle;
import org.apache.crunch.io.OutputHandler;
import org.apache.crunch.io.SequentialFileNamingScheme;
import org.apache.crunch.io.impl.FileTargetImpl;
@@ -63,21 +64,16 @@ public class AvroFileTarget extends FileTargetImpl {
@Override
public void configureForMapReduce(Job job, PType<?> ptype, Path outputPath, String name) {
AvroType<?> atype = (AvroType<?>) ptype;
- Configuration conf = job.getConfiguration();
+ FormatBundle bundle = FormatBundle.forOutput(AvroOutputFormat.class);
String schemaParam = null;
if (name == null) {
schemaParam = "avro.output.schema";
} else {
schemaParam = "avro.output.schema." + name;
}
- String outputSchema = conf.get(schemaParam);
- if (outputSchema == null) {
- conf.set(schemaParam, atype.getSchema().toString());
- } else if (!outputSchema.equals(atype.getSchema().toString())) {
- throw new IllegalStateException("Avro targets must use the same output schema");
- }
- Avros.configureReflectDataFactory(conf);
- configureForMapReduce(job, AvroWrapper.class, NullWritable.class, AvroOutputFormat.class,
+ bundle.set(schemaParam, atype.getSchema().toString());
+ Avros.configureReflectDataFactory(job.getConfiguration());
+ configureForMapReduce(job, AvroWrapper.class, NullWritable.class, bundle,
outputPath, name);
}
http://git-wip-us.apache.org/repos/asf/crunch/blob/d2a979ca/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniKeyTarget.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniKeyTarget.java b/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniKeyTarget.java
index e1f2ab1..e7acc08 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniKeyTarget.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniKeyTarget.java
@@ -83,7 +83,7 @@ public class TrevniKeyTarget extends FileTargetImpl {
AvroJob.setMapOutputKeySchema(job, atype.getSchema());
Avros.configureReflectDataFactory(conf);
- configureForMapReduce(job, AvroKey.class, NullWritable.class, TrevniOutputFormat.class,
+ configureForMapReduce(job, AvroKey.class, NullWritable.class, FormatBundle.forOutput(TrevniOutputFormat.class),
outputPath, name);
} else {
FormatBundle<TrevniOutputFormat> bundle = FormatBundle.forOutput(
http://git-wip-us.apache.org/repos/asf/crunch/blob/d2a979ca/crunch-core/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java b/crunch-core/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java
index a3cbdc8..766b9b0 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java
@@ -84,7 +84,13 @@ public class FileSourceImpl<T> implements Source<T> {
public List<Path> getPaths() {
return paths;
}
-
+
+ @Override
+ public Source<T> inputConf(String key, String value) {
+ inputBundle.set(key, value);
+ return this;
+ }
+
@Override
public Converter<?, ?, ?, ?> getConverter() {
return ptype.getConverter();
http://git-wip-us.apache.org/repos/asf/crunch/blob/d2a979ca/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java b/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java
index cbd87e3..8ae2589 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java
@@ -18,17 +18,21 @@
package org.apache.crunch.io.impl;
import java.io.IOException;
+import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import com.google.common.collect.ImmutableMap;
import org.apache.commons.lang.builder.HashCodeBuilder;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.crunch.CrunchRuntimeException;
import org.apache.crunch.SourceTarget;
+import org.apache.crunch.Target;
import org.apache.crunch.impl.mr.plan.PlanningParameters;
import org.apache.crunch.io.CrunchOutputs;
import org.apache.crunch.io.FileNamingScheme;
+import org.apache.crunch.io.FormatBundle;
import org.apache.crunch.io.OutputHandler;
import org.apache.crunch.io.PathTarget;
import org.apache.crunch.io.SourceTargetHelper;
@@ -46,14 +50,30 @@ public class FileTargetImpl implements PathTarget {
private static final Log LOG = LogFactory.getLog(FileTargetImpl.class);
protected final Path path;
- private final Class<? extends FileOutputFormat> outputFormatClass;
+ private final FormatBundle<? extends FileOutputFormat> formatBundle;
private final FileNamingScheme fileNamingScheme;
public FileTargetImpl(Path path, Class<? extends FileOutputFormat> outputFormatClass,
- FileNamingScheme fileNamingScheme) {
+ FileNamingScheme fileNamingScheme) {
+ this(path, outputFormatClass, fileNamingScheme, ImmutableMap.<String, String>of());
+ }
+
+ public FileTargetImpl(Path path, Class<? extends FileOutputFormat> outputFormatClass,
+ FileNamingScheme fileNamingScheme, Map<String, String> extraConf) {
this.path = path;
- this.outputFormatClass = outputFormatClass;
+ this.formatBundle = FormatBundle.forOutput(outputFormatClass);
this.fileNamingScheme = fileNamingScheme;
+ if (extraConf != null && !extraConf.isEmpty()) {
+ for (Map.Entry<String, String> e : extraConf.entrySet()) {
+ formatBundle.set(e.getKey(), e.getValue());
+ }
+ }
+ }
+
+ @Override
+ public Target outputConf(String key, String value) {
+ formatBundle.set(key, value);
+ return this;
}
@Override
@@ -61,22 +81,29 @@ public class FileTargetImpl implements PathTarget {
Converter converter = ptype.getConverter();
Class keyClass = converter.getKeyClass();
Class valueClass = converter.getValueClass();
- configureForMapReduce(job, keyClass, valueClass, outputFormatClass, outputPath, name);
+ configureForMapReduce(job, keyClass, valueClass, formatBundle, outputPath, name);
}
+ @Deprecated
protected void configureForMapReduce(Job job, Class keyClass, Class valueClass,
Class outputFormatClass, Path outputPath, String name) {
+ configureForMapReduce(job, keyClass, valueClass, FormatBundle.forOutput(outputFormatClass), outputPath, name);
+ }
+
+ protected void configureForMapReduce(Job job, Class keyClass, Class valueClass,
+ FormatBundle formatBundle, Path outputPath, String name) {
try {
FileOutputFormat.setOutputPath(job, outputPath);
} catch (Exception e) {
throw new RuntimeException(e);
}
if (name == null) {
- job.setOutputFormatClass(outputFormatClass);
+ job.setOutputFormatClass(formatBundle.getFormatClass());
+ formatBundle.configure(job.getConfiguration());
job.setOutputKeyClass(keyClass);
job.setOutputValueClass(valueClass);
} else {
- CrunchOutputs.addNamedOutput(job, name, outputFormatClass, keyClass, valueClass);
+ CrunchOutputs.addNamedOutput(job, name, formatBundle, keyClass, valueClass);
}
}
@@ -185,7 +212,11 @@ public class FileTargetImpl implements PathTarget {
@Override
public String toString() {
- return new StringBuilder().append(outputFormatClass.getSimpleName()).append("(").append(path).append(")")
+ return new StringBuilder()
+ .append(formatBundle.getFormatClass().getSimpleName())
+ .append("(")
+ .append(path)
+ .append(")")
.toString();
}
http://git-wip-us.apache.org/repos/asf/crunch/blob/d2a979ca/crunch-core/src/main/java/org/apache/crunch/io/impl/SourceTargetImpl.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/impl/SourceTargetImpl.java b/crunch-core/src/main/java/org/apache/crunch/io/impl/SourceTargetImpl.java
index 68c9430..b15a00b 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/impl/SourceTargetImpl.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/impl/SourceTargetImpl.java
@@ -40,6 +40,12 @@ class SourceTargetImpl<T> implements SourceTarget<T> {
}
@Override
+ public Source<T> inputConf(String key, String value) {
+ source.inputConf(key, value);
+ return this;
+ }
+
+ @Override
public PType<T> getType() {
return source.getType();
}
@@ -87,6 +93,12 @@ class SourceTargetImpl<T> implements SourceTarget<T> {
}
@Override
+ public Target outputConf(String key, String value) {
+ target.outputConf(key, value);
+ return this;
+ }
+
+ @Override
public boolean handleExisting(WriteMode strategy, long lastModifiedAt, Configuration conf) {
return target.handleExisting(strategy, lastModifiedAt, conf);
}
@@ -105,4 +117,11 @@ class SourceTargetImpl<T> implements SourceTarget<T> {
public Converter<?, ?, ?, ?> getConverter(PType<?> ptype) {
return target.getConverter(ptype);
}
+
+ @Override
+ public SourceTarget<T> conf(String key, String value) {
+ source.inputConf(key, value);
+ target.outputConf(key, value);
+ return this;
+ }
}
http://git-wip-us.apache.org/repos/asf/crunch/blob/d2a979ca/crunch-core/src/main/java/org/apache/crunch/io/text/TextFileTarget.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/text/TextFileTarget.java b/crunch-core/src/main/java/org/apache/crunch/io/text/TextFileTarget.java
index 17ae7a6..4b9197b 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/text/TextFileTarget.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/text/TextFileTarget.java
@@ -20,6 +20,7 @@ package org.apache.crunch.io.text;
import org.apache.avro.Schema;
import org.apache.crunch.SourceTarget;
import org.apache.crunch.io.FileNamingScheme;
+import org.apache.crunch.io.FormatBundle;
import org.apache.crunch.io.SequentialFileNamingScheme;
import org.apache.crunch.io.impl.FileTargetImpl;
import org.apache.crunch.types.Converter;
@@ -72,7 +73,7 @@ public class TextFileTarget extends FileTargetImpl {
Converter converter = ptype.getConverter();
Class keyClass = converter.getKeyClass();
Class valueClass = converter.getValueClass();
- configureForMapReduce(job, keyClass, valueClass, getOutputFormat(ptype), outputPath, name);
+ configureForMapReduce(job, keyClass, valueClass, FormatBundle.forOutput(getOutputFormat(ptype)), outputPath, name);
}
@Override
http://git-wip-us.apache.org/repos/asf/crunch/blob/d2a979ca/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java
----------------------------------------------------------------------
diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java
index c003e48..1b2a03e 100644
--- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java
+++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java
@@ -26,6 +26,8 @@ import org.apache.commons.lang.builder.HashCodeBuilder;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.crunch.Pair;
+import org.apache.crunch.Source;
+import org.apache.crunch.SourceTarget;
import org.apache.crunch.TableSource;
import org.apache.crunch.impl.mr.run.CrunchMapper;
import org.apache.crunch.io.CrunchInputs;
@@ -73,6 +75,12 @@ public class HBaseSourceTarget extends HBaseTarget implements
}
@Override
+ public Source<Pair<ImmutableBytesWritable, Result>> inputConf(String key, String value) {
+ inputBundle.set(key, value);
+ return this;
+ }
+
+ @Override
public PType<Pair<ImmutableBytesWritable, Result>> getType() {
return PTYPE;
}
@@ -146,6 +154,13 @@ public class HBaseSourceTarget extends HBaseTarget implements
return new HTableIterable(htable, scan);
}
+ @Override
+ public SourceTarget<Pair<ImmutableBytesWritable, Result>> conf(String key, String value) {
+ inputConf(key, value);
+ outputConf(key, value);
+ return this;
+ }
+
private static class HTableIterable implements Iterable<Pair<ImmutableBytesWritable, Result>> {
private final HTable table;
private final Scan scan;
http://git-wip-us.apache.org/repos/asf/crunch/blob/d2a979ca/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java
----------------------------------------------------------------------
diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java
index 69a260e..2c3c239 100644
--- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java
+++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java
@@ -18,12 +18,15 @@
package org.apache.crunch.io.hbase;
import java.io.IOException;
+import java.util.Map;
+import com.google.common.collect.Maps;
import org.apache.commons.lang.builder.HashCodeBuilder;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.crunch.CrunchRuntimeException;
import org.apache.crunch.SourceTarget;
+import org.apache.crunch.Target;
import org.apache.crunch.io.CrunchOutputs;
import org.apache.crunch.io.FormatBundle;
import org.apache.crunch.io.MapReduceTarget;
@@ -45,6 +48,7 @@ public class HBaseTarget implements MapReduceTarget {
private static final Log LOG = LogFactory.getLog(HBaseTarget.class);
protected String table;
+ private Map<String, String> extraConf = Maps.newHashMap();
public HBaseTarget(String table) {
this.table = table;
@@ -100,10 +104,16 @@ public class HBaseTarget implements MapReduceTarget {
job.setOutputKeyClass(ImmutableBytesWritable.class);
job.setOutputValueClass(typeClass);
conf.set(TableOutputFormat.OUTPUT_TABLE, table);
+ for (Map.Entry<String, String> e : extraConf.entrySet()) {
+ conf.set(e.getKey(), e.getValue());
+ }
} else {
FormatBundle<TableOutputFormat> bundle = FormatBundle.forOutput(
TableOutputFormat.class);
bundle.set(TableOutputFormat.OUTPUT_TABLE, table);
+ for (Map.Entry<String, String> e : extraConf.entrySet()) {
+ bundle.set(e.getKey(), e.getValue());
+ }
CrunchOutputs.addNamedOutput(job, name,
bundle,
ImmutableBytesWritable.class,
@@ -117,6 +127,12 @@ public class HBaseTarget implements MapReduceTarget {
}
@Override
+ public Target outputConf(String key, String value) {
+ extraConf.put(key, value);
+ return this;
+ }
+
+ @Override
public boolean handleExisting(WriteMode strategy, long lastModifiedAt, Configuration conf) {
LOG.info("HBaseTarget ignores checks for existing outputs...");
return false;
http://git-wip-us.apache.org/repos/asf/crunch/blob/d2a979ca/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileTarget.java
----------------------------------------------------------------------
diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileTarget.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileTarget.java
index 1cef4fa..0a78bd8 100644
--- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileTarget.java
+++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileTarget.java
@@ -33,7 +33,6 @@ import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class HFileTarget extends FileTargetImpl {
private static final HColumnDescriptor DEFAULT_COLUMN_DESCRIPTOR = new HColumnDescriptor();
- private final HColumnDescriptor hcol;
public HFileTarget(String path) {
this(new Path(path));
@@ -45,34 +44,8 @@ public class HFileTarget extends FileTargetImpl {
public HFileTarget(Path path, HColumnDescriptor hcol) {
super(path, HFileOutputFormatForCrunch.class, SequentialFileNamingScheme.getInstance());
- this.hcol = Preconditions.checkNotNull(hcol);
- }
-
- @Override
- protected void configureForMapReduce(
- Job job,
- Class keyClass,
- Class valueClass,
- Class outputFormatClass,
- Path outputPath,
- String name) {
- try {
- FileOutputFormat.setOutputPath(job, outputPath);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
-
- String hcolStr = Hex.encodeHexString(WritableUtils.toByteArray(hcol));
- if (name == null) {
- job.setOutputFormatClass(HFileOutputFormatForCrunch.class);
- job.setOutputKeyClass(keyClass);
- job.setOutputValueClass(valueClass);
- job.getConfiguration().set(HFileOutputFormatForCrunch.HCOLUMN_DESCRIPTOR_KEY, hcolStr);
- } else {
- FormatBundle<HFileOutputFormatForCrunch> bundle = FormatBundle.forOutput(HFileOutputFormatForCrunch.class);
- bundle.set(HFileOutputFormatForCrunch.HCOLUMN_DESCRIPTOR_KEY, hcolStr);
- CrunchOutputs.addNamedOutput(job, name, bundle, keyClass, valueClass);
- }
+ Preconditions.checkNotNull(hcol);
+ outputConf(HFileOutputFormatForCrunch.HCOLUMN_DESCRIPTOR_KEY, Hex.encodeHexString(WritableUtils.toByteArray(hcol)));
}
@Override