You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2014/08/16 05:25:07 UTC
git commit: TEZ-1423. Ability to pass custom properties to
keySerializer for OnFileUnorderedPartitionedKVOutput (Siddharth Seth via
bikas)
Repository: tez
Updated Branches:
refs/heads/master 469bf9052 -> 98875db09
TEZ-1423. Ability to pass custom properties to keySerializer for OnFileUnorderedPartitionedKVOutput (Siddharth Seth via bikas)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/98875db0
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/98875db0
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/98875db0
Branch: refs/heads/master
Commit: 98875db09ece09acee71190489d40ce17ba239c3
Parents: 469bf90
Author: Bikas Saha <bi...@apache.org>
Authored: Fri Aug 15 20:25:12 2014 -0700
Committer: Bikas Saha <bi...@apache.org>
Committed: Fri Aug 15 20:25:12 2014 -0700
----------------------------------------------------------------------
CHANGES.txt | 2 ++
.../HadoopKeyValuesBasedBaseEdgeConfigurer.java | 10 +++++--
.../conf/OrderedGroupedKVInputConfigurer.java | 29 ++++++++++++++++--
.../OrderedPartitionedKVEdgeConfigurer.java | 26 ++++++++++------
.../OrderedPartitionedKVOutputConfigurer.java | 29 ++++++++++++++++--
.../library/conf/UnorderedKVEdgeConfigurer.java | 27 +++++++++++------
.../conf/UnorderedKVInputConfigurer.java | 30 +++++++++++++++++--
.../conf/UnorderedKVOutputConfigurer.java | 30 +++++++++++++++++--
.../UnorderedPartitionedKVEdgeConfigurer.java | 25 ++++++++++------
.../UnorderedPartitionedKVOutputConfigurer.java | 31 +++++++++++++++++---
.../conf/TestOnFileSortedOutputConfigurer.java | 2 +-
.../TestOnFileUnorderedKVOutputConfigurer.java | 2 +-
...eUnorderedPartitionedKVOutputConfigurer.java | 6 ++--
.../TestOrderedPartitionedKVEdgeConfigurer.java | 8 ++---
.../conf/TestShuffledMergedInputConfigurer.java | 2 +-
.../TestShuffledUnorderedKVInputConfigurer.java | 2 +-
...estUnorderedPartitionedKVEdgeConfigurer.java | 4 +--
...tUnorderedUnpartitionedKVEdgeConfigurer.java | 2 +-
18 files changed, 207 insertions(+), 60 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/98875db0/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 2ef940b..c96e3fe 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -67,6 +67,8 @@ INCOMPATIBLE CHANGES
TEZ-1438. Annotate add java doc for tez-runtime-library and tez-mapreduce
TEZ-1055. Rename tez-mapreduce-examples to tez-examples
TEZ-1132. Consistent naming of Input and Outputs
+ TEZ-1423. Ability to pass custom properties to keySerializer for
+ OnFileUnorderedPartitionedKVOutput
Release 0.4.0-incubating: 2014-04-05
http://git-wip-us.apache.org/repos/asf/tez/blob/98875db0/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/HadoopKeyValuesBasedBaseEdgeConfigurer.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/HadoopKeyValuesBasedBaseEdgeConfigurer.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/HadoopKeyValuesBasedBaseEdgeConfigurer.java
index 81c1185..15be335 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/HadoopKeyValuesBasedBaseEdgeConfigurer.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/HadoopKeyValuesBasedBaseEdgeConfigurer.java
@@ -20,6 +20,8 @@ package org.apache.tez.runtime.library.conf;
import javax.annotation.Nullable;
+import java.util.Map;
+
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.tez.dag.api.UserPayload;
@@ -58,11 +60,13 @@ abstract class HadoopKeyValuesBasedBaseEdgeConfigurer {
* @param enabled whether to enable compression or not
* @param compressionCodec the codec to be used if compression is enabled. null implies using
* the default
+ * @param codecConf the codec configuration. This can be null, and is a {@link
+ * java.util.Map} of key-value pairs. The keys should be limited to
+ * the ones required by the comparator.
* @return instance of the current builder
*/
- public T setCompression(boolean enabled, @Nullable String compressionCodec) {
- return (T) this;
- }
+ public abstract T setCompression(boolean enabled, @Nullable String compressionCodec,
+ @Nullable Map<String, String> codecConf);
}
http://git-wip-us.apache.org/repos/asf/tez/blob/98875db0/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedGroupedKVInputConfigurer.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedGroupedKVInputConfigurer.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedGroupedKVInputConfigurer.java
index 016cbfd..cc6e44b 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedGroupedKVInputConfigurer.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedGroupedKVInputConfigurer.java
@@ -435,12 +435,18 @@ public class OrderedGroupedKVInputConfigurer {
return this;
}
- public Builder setCompression(boolean enabled, @Nullable String compressionCodec) {
+ public Builder setCompression(boolean enabled, @Nullable String compressionCodec,
+ @Nullable Map<String, String> codecConf) {
this.conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS, enabled);
if (enabled && compressionCodec != null) {
this.conf
.set(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS_CODEC, compressionCodec);
}
+ if (codecConf != null) {
+ // Merging the confs for now. Change to be specific in the future.
+ ConfigUtils.mergeConfsWithExclusions(this.conf, codecConf,
+ TezRuntimeConfiguration.getRuntimeConfigKeySet());
+ }
return this;
}
@@ -451,10 +457,13 @@ public class OrderedGroupedKVInputConfigurer {
*
* @param serializationClassName
* @param comparatorClassName
+ * @param serializerConf the serializer configuration. This can be null, and is a
+ * {@link java.util.Map} of key-value pairs. The keys should be limited
+ * to the ones required by the comparator.
* @return
*/
public Builder setKeySerializationClass(String serializationClassName,
- String comparatorClassName) {
+ String comparatorClassName, @Nullable Map<String, String> serializerConf) {
Preconditions.checkArgument(serializationClassName != null,
"serializationClassName cannot be null");
Preconditions.checkArgument(comparatorClassName != null,
@@ -462,6 +471,11 @@ public class OrderedGroupedKVInputConfigurer {
this.conf.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, serializationClassName + ","
+ conf.get(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY));
setKeyComparatorClass(comparatorClassName, null);
+ if (serializerConf != null) {
+ // Merging the confs for now. Change to be specific in the future.
+ ConfigUtils.mergeConfsWithExclusions(this.conf, serializerConf,
+ TezRuntimeConfiguration.getRuntimeConfigKeySet());
+ }
return this;
}
@@ -469,13 +483,22 @@ public class OrderedGroupedKVInputConfigurer {
* Serialization class to be used for serializing values.
*
* @param serializationClassName
+ * @param serializerConf the serializer configuration. This can be null, and is a
+ * {@link java.util.Map} of key-value pairs. The keys should be limited
+ * to the ones required by the comparator.
* @return
*/
- public Builder setValueSerializationClass(String serializationClassName) {
+ public Builder setValueSerializationClass(String serializationClassName,
+ @Nullable Map<String, String> serializerConf) {
Preconditions.checkArgument(serializationClassName != null,
"serializationClassName cannot be null");
this.conf.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, serializationClassName + ","
+ conf.get(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY));
+ if (serializerConf != null) {
+ // Merging the confs for now. Change to be specific in the future.
+ ConfigUtils.mergeConfsWithExclusions(this.conf, serializerConf,
+ TezRuntimeConfiguration.getRuntimeConfigKeySet());
+ }
return this;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/98875db0/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedPartitionedKVEdgeConfigurer.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedPartitionedKVEdgeConfigurer.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedPartitionedKVEdgeConfigurer.java
index a9c18b1..3abaf47 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedPartitionedKVEdgeConfigurer.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedPartitionedKVEdgeConfigurer.java
@@ -206,12 +206,15 @@ public class OrderedPartitionedKVEdgeConfigurer extends HadoopKeyValuesBasedBase
*
* @param serializationClassName
* @param comparatorClassName
+ * @param serializerConf the serializer configuration. This can be null, and is a
+ * {@link java.util.Map} of key-value pairs. The keys should be limited
+ * to the ones required by the comparator.
* @return
*/
public Builder setKeySerializationClass(String serializationClassName,
- String comparatorClassName) {
- outputBuilder.setKeySerializationClass(serializationClassName, comparatorClassName);
- inputBuilder.setKeySerializationClass(serializationClassName, comparatorClassName);
+ String comparatorClassName, @Nullable Map<String, String> serializerConf) {
+ outputBuilder.setKeySerializationClass(serializationClassName, comparatorClassName, serializerConf);
+ inputBuilder.setKeySerializationClass(serializationClassName, comparatorClassName, serializerConf);
return this;
}
@@ -219,19 +222,24 @@ public class OrderedPartitionedKVEdgeConfigurer extends HadoopKeyValuesBasedBase
* Set serialization class responsible for providing serializer/deserializer for values.
*
* @param serializationClassName
+ * @param serializerConf the serializer configuration. This can be null, and is a
+ * {@link java.util.Map} of key-value pairs. The keys should be limited
+ * to the ones required by the comparator.
* @return
*/
- public Builder setValueSerializationClass(String serializationClassName) {
- outputBuilder.setValueSerializationClass(serializationClassName);
- inputBuilder.setValueSerializationClass(serializationClassName);
+ public Builder setValueSerializationClass(String serializationClassName,
+ @Nullable Map<String, String> serializerConf) {
+ outputBuilder.setValueSerializationClass(serializationClassName, serializerConf);
+ inputBuilder.setValueSerializationClass(serializationClassName, serializerConf);
return this;
}
@Override
- public Builder setCompression(boolean enabled, @Nullable String compressionCodec) {
- outputBuilder.setCompression(enabled, compressionCodec);
- inputBuilder.setCompression(enabled, compressionCodec);
+ public Builder setCompression(boolean enabled, @Nullable String compressionCodec,
+ @Nullable Map<String, String> codecConf) {
+ outputBuilder.setCompression(enabled, compressionCodec, codecConf);
+ inputBuilder.setCompression(enabled, compressionCodec, codecConf);
return this;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/98875db0/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedPartitionedKVOutputConfigurer.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedPartitionedKVOutputConfigurer.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedPartitionedKVOutputConfigurer.java
index 72063e0..dd73c29 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedPartitionedKVOutputConfigurer.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedPartitionedKVOutputConfigurer.java
@@ -361,12 +361,18 @@ public class OrderedPartitionedKVOutputConfigurer {
return this;
}
- public Builder setCompression(boolean enabled, @Nullable String compressionCodec) {
+ public Builder setCompression(boolean enabled, @Nullable String compressionCodec,
+ @Nullable Map<String, String> codecConf) {
this.conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS, enabled);
if (enabled && compressionCodec != null) {
this.conf
.set(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS_CODEC, compressionCodec);
}
+ if (codecConf != null) {
+ // Merging the confs for now. Change to be specific in the future.
+ ConfigUtils.mergeConfsWithExclusions(this.conf, codecConf,
+ TezRuntimeConfiguration.getRuntimeConfigKeySet());
+ }
return this;
}
@@ -377,10 +383,13 @@ public class OrderedPartitionedKVOutputConfigurer {
*
* @param serializationClassName
* @param comparatorClassName
+ * @param serializerConf the serializer configuration. This can be null, and is a
+ * {@link java.util.Map} of key-value pairs. The keys should be limited
+ * to the ones required by the comparator.
* @return
*/
public Builder setKeySerializationClass(String serializationClassName,
- String comparatorClassName) {
+ String comparatorClassName, @Nullable Map<String, String> serializerConf) {
Preconditions.checkArgument(serializationClassName != null,
"serializationClassName cannot be null");
Preconditions.checkArgument(comparatorClassName != null,
@@ -388,6 +397,11 @@ public class OrderedPartitionedKVOutputConfigurer {
this.conf.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, serializationClassName + ","
+ conf.get(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY));
setKeyComparatorClass(comparatorClassName, null);
+ if (serializerConf != null) {
+ // Merging the confs for now. Change to be specific in the future.
+ ConfigUtils.mergeConfsWithExclusions(this.conf, serializerConf,
+ TezRuntimeConfiguration.getRuntimeConfigKeySet());
+ }
return this;
}
@@ -395,13 +409,22 @@ public class OrderedPartitionedKVOutputConfigurer {
* Set serialization class responsible for providing serializer/deserializer for values.
*
* @param serializationClassName
+ * @param serializerConf the serializer configuration. This can be null, and is a
+ * {@link java.util.Map} of key-value pairs. The keys should be limited
+ * to the ones required by the comparator.
* @return
*/
- public Builder setValueSerializationClass(String serializationClassName) {
+ public Builder setValueSerializationClass(String serializationClassName,
+ @Nullable Map<String, String> serializerConf) {
Preconditions.checkArgument(serializationClassName != null,
"serializationClassName cannot be null");
this.conf.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, serializationClassName + ","
+ conf.get(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY));
+ if (serializerConf != null) {
+ // Merging the confs for now. Change to be specific in the future.
+ ConfigUtils.mergeConfsWithExclusions(this.conf, serializerConf,
+ TezRuntimeConfiguration.getRuntimeConfigKeySet());
+ }
return this;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/98875db0/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedKVEdgeConfigurer.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedKVEdgeConfigurer.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedKVEdgeConfigurer.java
index 1e0e56c..e96e0ef 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedKVEdgeConfigurer.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedKVEdgeConfigurer.java
@@ -171,9 +171,10 @@ public class UnorderedKVEdgeConfigurer extends HadoopKeyValuesBasedBaseEdgeConfi
}
@Override
- public Builder setCompression(boolean enabled, @Nullable String compressionCodec) {
- outputBuilder.setCompression(enabled, compressionCodec);
- inputBuilder.setCompression(enabled, compressionCodec);
+ public Builder setCompression(boolean enabled, @Nullable String compressionCodec,
+ @Nullable Map<String, String> codecConf) {
+ outputBuilder.setCompression(enabled, compressionCodec, codecConf);
+ inputBuilder.setCompression(enabled, compressionCodec, codecConf);
return this;
}
@@ -203,11 +204,15 @@ public class UnorderedKVEdgeConfigurer extends HadoopKeyValuesBasedBaseEdgeConfi
* the corresponding comparator class to be used as key comparator.
*
* @param serializationClassName
+ * @param serializerConf the serializer configuration. This can be null, and is a
+ * {@link java.util.Map} of key-value pairs. The keys should be limited
+ * to the ones required by the comparator.
* @return
*/
- public Builder setKeySerializationClass(String serializationClassName) {
- outputBuilder.setKeySerializationClass(serializationClassName);
- inputBuilder.setKeySerializationClass(serializationClassName);
+ public Builder setKeySerializationClass(String serializationClassName,
+ @Nullable Map<String, String> serializerConf) {
+ outputBuilder.setKeySerializationClass(serializationClassName, serializerConf);
+ inputBuilder.setKeySerializationClass(serializationClassName, serializerConf);
return this;
}
@@ -215,11 +220,15 @@ public class UnorderedKVEdgeConfigurer extends HadoopKeyValuesBasedBaseEdgeConfi
* Set serialization class responsible for providing serializer/deserializer for values.
*
* @param serializationClassName
+ * @param serializerConf the serializer configuration. This can be null, and is a
+ * {@link java.util.Map} of key-value pairs. The keys should be limited
+ * to the ones required by the comparator.
* @return
*/
- public Builder setValueSerializationClass(String serializationClassName) {
- outputBuilder.setValueSerializationClass(serializationClassName);
- inputBuilder.setValueSerializationClass(serializationClassName);
+ public Builder setValueSerializationClass(String serializationClassName,
+ @Nullable Map<String, String> serializerConf) {
+ outputBuilder.setValueSerializationClass(serializationClassName, serializerConf);
+ inputBuilder.setValueSerializationClass(serializationClassName, serializerConf);
return this;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/98875db0/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedKVInputConfigurer.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedKVInputConfigurer.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedKVInputConfigurer.java
index 2203928..8771c97 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedKVInputConfigurer.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedKVInputConfigurer.java
@@ -284,12 +284,18 @@ public class UnorderedKVInputConfigurer {
return this;
}
- public Builder setCompression(boolean enabled, @Nullable String compressionCodec) {
+ public Builder setCompression(boolean enabled, @Nullable String compressionCodec,
+ @Nullable Map<String, String> codecConf) {
this.conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS, enabled);
if (enabled && compressionCodec != null) {
this.conf
.set(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS_CODEC, compressionCodec);
}
+ if (codecConf != null) {
+ // Merging the confs for now. Change to be specific in the future.
+ ConfigUtils.mergeConfsWithExclusions(this.conf, codecConf,
+ TezRuntimeConfiguration.getRuntimeConfigKeySet());
+ }
return this;
}
@@ -298,13 +304,22 @@ public class UnorderedKVInputConfigurer {
* the corresponding comparator class to be used as key comparator.
*
* @param serializationClassName
+ * @param serializerConf the serializer configuration. This can be null, and is a
+ * {@link java.util.Map} of key-value pairs. The keys should be limited
+ * to the ones required by the comparator.
* @return
*/
- public Builder setKeySerializationClass(String serializationClassName) {
+ public Builder setKeySerializationClass(String serializationClassName,
+ @Nullable Map<String, String> serializerConf) {
Preconditions.checkArgument(serializationClassName != null,
"serializationClassName cannot be null");
this.conf.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, serializationClassName + ","
+ conf.get(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY));
+ if (serializerConf != null) {
+ // Merging the confs for now. Change to be specific in the future.
+ ConfigUtils.mergeConfsWithExclusions(this.conf, serializerConf,
+ TezRuntimeConfiguration.getRuntimeConfigKeySet());
+ }
return this;
}
@@ -312,13 +327,22 @@ public class UnorderedKVInputConfigurer {
* Set serialization class responsible for providing serializer/deserializer for values.
*
* @param serializationClassName
+ * @param serializerConf the serializer configuration. This can be null, and is a
+ * {@link java.util.Map} of key-value pairs. The keys should be limited
+ * to the ones required by the comparator.
* @return
*/
- public Builder setValueSerializationClass(String serializationClassName) {
+ public Builder setValueSerializationClass(String serializationClassName,
+ @Nullable Map<String, String> serializerConf) {
Preconditions.checkArgument(serializationClassName != null,
"serializationClassName cannot be null");
this.conf.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, serializationClassName + ","
+ conf.get(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY));
+ if (serializerConf != null) {
+ // Merging the confs for now. Change to be specific in the future.
+ ConfigUtils.mergeConfsWithExclusions(this.conf, serializerConf,
+ TezRuntimeConfiguration.getRuntimeConfigKeySet());
+ }
return this;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/98875db0/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedKVOutputConfigurer.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedKVOutputConfigurer.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedKVOutputConfigurer.java
index 986ee45..2ef4350 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedKVOutputConfigurer.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedKVOutputConfigurer.java
@@ -214,13 +214,22 @@ public class UnorderedKVOutputConfigurer {
* Set serialization class responsible for providing serializer/deserializer for keys.
*
* @param serializationClassName
+ * @param serializerConf the serializer configuration. This can be null, and is a
+ * {@link java.util.Map} of key-value pairs. The keys should be limited
+ * to the ones required by the comparator.
* @return
*/
- public Builder setKeySerializationClass(String serializationClassName) {
+ public Builder setKeySerializationClass(String serializationClassName,
+ @Nullable Map<String, String> serializerConf) {
Preconditions.checkArgument(serializationClassName != null,
"serializationClassName cannot be null");
this.conf.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, serializationClassName + ","
+ conf.get(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY));
+ if (serializerConf != null) {
+ // Merging the confs for now. Change to be specific in the future.
+ ConfigUtils.mergeConfsWithExclusions(this.conf, serializerConf,
+ TezRuntimeConfiguration.getRuntimeConfigKeySet());
+ }
return this;
}
@@ -228,22 +237,37 @@ public class UnorderedKVOutputConfigurer {
* Set serialization class responsible for providing serializer/deserializer for values.
*
* @param serializationClassName
+ * @param serializerConf the serializer configuration. This can be null, and is a
+ * {@link java.util.Map} of key-value pairs. The keys should be limited
+ * to the ones required by the comparator.
* @return
*/
- public Builder setValueSerializationClass(String serializationClassName) {
+ public Builder setValueSerializationClass(String serializationClassName,
+ @Nullable Map<String, String> serializerConf) {
Preconditions.checkArgument(serializationClassName != null,
"serializationClassName cannot be null");
this.conf.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, serializationClassName + ","
+ conf.get(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY));
+ if (serializerConf != null) {
+ // Merging the confs for now. Change to be specific in the future.
+ ConfigUtils.mergeConfsWithExclusions(this.conf, serializerConf,
+ TezRuntimeConfiguration.getRuntimeConfigKeySet());
+ }
return this;
}
- public Builder setCompression(boolean enabled, @Nullable String compressionCodec) {
+ public Builder setCompression(boolean enabled, @Nullable String compressionCodec,
+ @Nullable Map<String, String> codecConf) {
this.conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS, enabled);
if (enabled && compressionCodec != null) {
this.conf
.set(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS_CODEC, compressionCodec);
}
+ if (codecConf != null) {
+ // Merging the confs for now. Change to be specific in the future.
+ ConfigUtils.mergeConfsWithExclusions(this.conf, codecConf,
+ TezRuntimeConfiguration.getRuntimeConfigKeySet());
+ }
return this;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/98875db0/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedPartitionedKVEdgeConfigurer.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedPartitionedKVEdgeConfigurer.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedPartitionedKVEdgeConfigurer.java
index 30e141c..e56a927 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedPartitionedKVEdgeConfigurer.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedPartitionedKVEdgeConfigurer.java
@@ -177,9 +177,9 @@ public class UnorderedPartitionedKVEdgeConfigurer extends HadoopKeyValuesBasedBa
}
@Override
- public Builder setCompression(boolean enabled, @Nullable String compressionCodec) {
- outputBuilder.setCompression(enabled, compressionCodec);
- inputBuilder.setCompression(enabled, compressionCodec);
+ public Builder setCompression(boolean enabled, @Nullable String compressionCodec, @Nullable Map<String, String> codecConf) {
+ outputBuilder.setCompression(enabled, compressionCodec, codecConf);
+ inputBuilder.setCompression(enabled, compressionCodec, codecConf);
return this;
}
@@ -208,11 +208,15 @@ public class UnorderedPartitionedKVEdgeConfigurer extends HadoopKeyValuesBasedBa
* Set serialization class responsible for providing serializer/deserializer for keys.
*
* @param serializationClassName
+ * @param serializerConf the serializer configuration. This can be null, and is a
+ * {@link java.util.Map} of key-value pairs. The keys should be limited
+ * to the ones required by the comparator.
* @return
*/
- public Builder setKeySerializationClass(String serializationClassName) {
- outputBuilder.setKeySerializationClass(serializationClassName);
- inputBuilder.setKeySerializationClass(serializationClassName);
+ public Builder setKeySerializationClass(String serializationClassName,
+ @Nullable Map<String, String> serializerConf) {
+ outputBuilder.setKeySerializationClass(serializationClassName, serializerConf);
+ inputBuilder.setKeySerializationClass(serializationClassName, serializerConf);
return this;
}
@@ -220,11 +224,14 @@ public class UnorderedPartitionedKVEdgeConfigurer extends HadoopKeyValuesBasedBa
* Set serialization class responsible for providing serializer/deserializer for values.
*
* @param serializationClassName
+ * @param serializerConf the serializer configuration. This can be null, and is a
+ * {@link java.util.Map} of key-value pairs. The keys should be limited
+ * to the ones required by the comparator.
* @return
*/
- public Builder setValueSerializationClass(String serializationClassName) {
- outputBuilder.setValueSerializationClass(serializationClassName);
- inputBuilder.setValueSerializationClass(serializationClassName);
+ public Builder setValueSerializationClass(String serializationClassName, @Nullable Map<String, String> serializerConf) {
+ outputBuilder.setValueSerializationClass(serializationClassName, serializerConf);
+ inputBuilder.setValueSerializationClass(serializationClassName, serializerConf);
return this;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/98875db0/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedPartitionedKVOutputConfigurer.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedPartitionedKVOutputConfigurer.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedPartitionedKVOutputConfigurer.java
index 8033c95..9417fbd 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedPartitionedKVOutputConfigurer.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedPartitionedKVOutputConfigurer.java
@@ -255,12 +255,18 @@ public class UnorderedPartitionedKVOutputConfigurer {
return this;
}
- public Builder setCompression(boolean enabled, @Nullable String compressionCodec) {
+ public Builder setCompression(boolean enabled, @Nullable String compressionCodec,
+ @Nullable Map<String, String> codecConf) {
this.conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS, enabled);
if (enabled && compressionCodec != null) {
this.conf
.set(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS_CODEC, compressionCodec);
}
+ if (codecConf != null) {
+ // Merging the confs for now. Change to be specific in the future.
+ ConfigUtils.mergeConfsWithExclusions(this.conf, codecConf,
+ TezRuntimeConfiguration.getRuntimeConfigKeySet());
+ }
return this;
}
@@ -268,27 +274,44 @@ public class UnorderedPartitionedKVOutputConfigurer {
* Set serialization class responsible for providing serializer/deserializer for keys.
*
* @param serializationClassName
+ * @param serializerConf the serializer configuration. This can be null, and is a
+ * {@link java.util.Map} of key-value pairs. The keys should be limited
+ * to the ones required by the comparator.
* @return
*/
- public Builder setKeySerializationClass(String serializationClassName) {
+ public Builder setKeySerializationClass(String serializationClassName,
+ @Nullable Map<String, String> serializerConf) {
Preconditions.checkArgument(serializationClassName != null,
"serializationClassName cannot be null");
this.conf.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, serializationClassName + ","
+ conf.get(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY));
- return this;
+ if (serializerConf != null) {
+ // Merging the confs for now. Change to be specific in the future.
+ ConfigUtils.mergeConfsWithExclusions(this.conf, serializerConf,
+ TezRuntimeConfiguration.getRuntimeConfigKeySet());
+ } return this;
}
/**
* Set serialization class responsible for providing serializer/deserializer for values.
*
* @param serializationClassName
+ * @param serializerConf the serializer configuration. This can be null, and is a
+ * {@link java.util.Map} of key-value pairs. The keys should be limited
+ * to the ones required by the comparator.
* @return
*/
- public Builder setValueSerializationClass(String serializationClassName) {
+ public Builder setValueSerializationClass(String serializationClassName,
+ @Nullable Map<String, String> serializerConf) {
Preconditions.checkArgument(serializationClassName != null,
"serializationClassName cannot be null");
this.conf.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, serializationClassName + ","
+ conf.get(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY));
+ if (serializerConf != null) {
+ // Merging the confs for now. Change to be specific in the future.
+ ConfigUtils.mergeConfsWithExclusions(this.conf, serializerConf,
+ TezRuntimeConfiguration.getRuntimeConfigKeySet());
+ }
return this;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/98875db0/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOnFileSortedOutputConfigurer.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOnFileSortedOutputConfigurer.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOnFileSortedOutputConfigurer.java
index 970f500..f0a6bf8 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOnFileSortedOutputConfigurer.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOnFileSortedOutputConfigurer.java
@@ -75,7 +75,7 @@ public class TestOnFileSortedOutputConfigurer {
OrderedPartitionedKVOutputConfigurer.Builder builder =
OrderedPartitionedKVOutputConfigurer.newBuilder("KEY", "VALUE", "PARTITIONER", null)
.setKeyComparatorClass("KEY_COMPARATOR", null)
- .setCompression(true, "CustomCodec")
+ .setCompression(true, "CustomCodec", null)
.setSortBufferSize(2048)
.setAdditionalConfiguration("test.key.1", "key1")
.setAdditionalConfiguration("file.shouldExist", "file")
http://git-wip-us.apache.org/repos/asf/tez/blob/98875db0/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOnFileUnorderedKVOutputConfigurer.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOnFileUnorderedKVOutputConfigurer.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOnFileUnorderedKVOutputConfigurer.java
index 2422203..bd182ec 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOnFileUnorderedKVOutputConfigurer.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOnFileUnorderedKVOutputConfigurer.java
@@ -65,7 +65,7 @@ public class TestOnFileUnorderedKVOutputConfigurer {
additionalConf.put("file.shouldExist", "file");
UnorderedKVOutputConfigurer.Builder builder =
UnorderedKVOutputConfigurer.newBuilder("KEY", "VALUE")
- .setCompression(true, "CustomCodec")
+ .setCompression(true, "CustomCodec", null)
.setAdditionalConfiguration("fs.shouldExist", "fs")
.setAdditionalConfiguration("test.key.1", "key1")
.setAdditionalConfiguration(TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD,
http://git-wip-us.apache.org/repos/asf/tez/blob/98875db0/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOnFileUnorderedPartitionedKVOutputConfigurer.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOnFileUnorderedPartitionedKVOutputConfigurer.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOnFileUnorderedPartitionedKVOutputConfigurer.java
index 73cab11..0ce96eb 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOnFileUnorderedPartitionedKVOutputConfigurer.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOnFileUnorderedPartitionedKVOutputConfigurer.java
@@ -76,7 +76,7 @@ public class TestOnFileUnorderedPartitionedKVOutputConfigurer {
UnorderedPartitionedKVOutputConfigurer.Builder builder =
UnorderedPartitionedKVOutputConfigurer.newBuilder("KEY", "VALUE", "PARTITIONER",
null)
- .setCompression(true, "CustomCodec")
+ .setCompression(true, "CustomCodec", null)
.setAvailableBufferSize(1111)
.setAdditionalConfiguration("fs.shouldExist", "fs")
.setAdditionalConfiguration("test.key.1", "key1")
@@ -123,8 +123,8 @@ public class TestOnFileUnorderedPartitionedKVOutputConfigurer {
UnorderedPartitionedKVOutputConfigurer.Builder builder =
UnorderedPartitionedKVOutputConfigurer
.newBuilder("KEY", "VALUE", "PARTITIONER", null)
- .setKeySerializationClass("SerClass1")
- .setValueSerializationClass("SerClass2");
+ .setKeySerializationClass("SerClass1", null)
+ .setValueSerializationClass("SerClass2", null);
UnorderedPartitionedKVOutputConfigurer configuration = builder.build();
UnorderedPartitionedKVOutputConfigurer rebuilt =
http://git-wip-us.apache.org/repos/asf/tez/blob/98875db0/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOrderedPartitionedKVEdgeConfigurer.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOrderedPartitionedKVEdgeConfigurer.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOrderedPartitionedKVEdgeConfigurer.java
index 6f91cbc..924b988 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOrderedPartitionedKVEdgeConfigurer.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOrderedPartitionedKVEdgeConfigurer.java
@@ -186,7 +186,7 @@ public class TestOrderedPartitionedKVEdgeConfigurer {
.configureOutput().setSortBufferSize(1111).setSorterNumThreads(2).done()
.configureInput().setMaxSingleMemorySegmentFraction(0.11f).setMergeFraction(0.22f)
.setPostMergeBufferFraction(0.33f).setShuffleBufferFraction(0.44f).done()
- .setCompression(true, "CustomCodec");
+ .setCompression(true, "CustomCodec", null);
OrderedPartitionedKVEdgeConfigurer configuration = builder.build();
@@ -244,9 +244,9 @@ public class TestOrderedPartitionedKVEdgeConfigurer {
public void testSerialization() {
OrderedPartitionedKVEdgeConfigurer.Builder builder = OrderedPartitionedKVEdgeConfigurer
.newBuilder("KEY", "VALUE", "PARTITIONER")
- .setCompression(true, "CustomCodec")
- .setKeySerializationClass("serClass1", "SomeComparator1")
- .setValueSerializationClass("serClass2");
+ .setCompression(true, "CustomCodec", null)
+ .setKeySerializationClass("serClass1", "SomeComparator1", null)
+ .setValueSerializationClass("serClass2", null);
OrderedPartitionedKVEdgeConfigurer configuration = builder.build();
http://git-wip-us.apache.org/repos/asf/tez/blob/98875db0/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestShuffledMergedInputConfigurer.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestShuffledMergedInputConfigurer.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestShuffledMergedInputConfigurer.java
index 4560556..a805601 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestShuffledMergedInputConfigurer.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestShuffledMergedInputConfigurer.java
@@ -65,7 +65,7 @@ public class TestShuffledMergedInputConfigurer {
OrderedGroupedKVInputConfigurer.Builder builder =
OrderedGroupedKVInputConfigurer.newBuilder("KEY", "VALUE")
.setKeyComparatorClass("KEY_COMPARATOR", null)
- .setCompression(true, "CustomCodec")
+ .setCompression(true, "CustomCodec", null)
.setMaxSingleMemorySegmentFraction(0.11f)
.setMergeFraction(0.22f)
.setPostMergeBufferFraction(0.33f)
http://git-wip-us.apache.org/repos/asf/tez/blob/98875db0/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestShuffledUnorderedKVInputConfigurer.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestShuffledUnorderedKVInputConfigurer.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestShuffledUnorderedKVInputConfigurer.java
index 3f980a4..d4324d3 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestShuffledUnorderedKVInputConfigurer.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestShuffledUnorderedKVInputConfigurer.java
@@ -63,7 +63,7 @@ public class TestShuffledUnorderedKVInputConfigurer {
additionalConf.put("file.shouldExist", "file");
UnorderedKVInputConfigurer.Builder builder =
UnorderedKVInputConfigurer.newBuilder("KEY", "VALUE")
- .setCompression(true, "CustomCodec")
+ .setCompression(true, "CustomCodec", null)
.setMaxSingleMemorySegmentFraction(0.11f)
.setMergeFraction(0.22f)
.setShuffleBufferFraction(0.33f)
http://git-wip-us.apache.org/repos/asf/tez/blob/98875db0/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestUnorderedPartitionedKVEdgeConfigurer.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestUnorderedPartitionedKVEdgeConfigurer.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestUnorderedPartitionedKVEdgeConfigurer.java
index 03b218e..796a5b0 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestUnorderedPartitionedKVEdgeConfigurer.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestUnorderedPartitionedKVEdgeConfigurer.java
@@ -63,8 +63,8 @@ public class TestUnorderedPartitionedKVEdgeConfigurer {
public void testDefaultConfigsUsed() {
UnorderedPartitionedKVEdgeConfigurer.Builder builder =
UnorderedPartitionedKVEdgeConfigurer.newBuilder("KEY", "VALUE", "PARTITIONER");
- builder.setKeySerializationClass("SerClass1");
- builder.setValueSerializationClass("SerClass2");
+ builder.setKeySerializationClass("SerClass1", null);
+ builder.setValueSerializationClass("SerClass2", null);
UnorderedPartitionedKVEdgeConfigurer configuration = builder.build();
http://git-wip-us.apache.org/repos/asf/tez/blob/98875db0/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestUnorderedUnpartitionedKVEdgeConfigurer.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestUnorderedUnpartitionedKVEdgeConfigurer.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestUnorderedUnpartitionedKVEdgeConfigurer.java
index caa2364..2215557 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestUnorderedUnpartitionedKVEdgeConfigurer.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestUnorderedUnpartitionedKVEdgeConfigurer.java
@@ -56,7 +56,7 @@ public class TestUnorderedUnpartitionedKVEdgeConfigurer {
public void testDefaultConfigsUsed() {
UnorderedKVEdgeConfigurer.Builder builder =
UnorderedKVEdgeConfigurer.newBuilder("KEY", "VALUE");
- builder.setKeySerializationClass("SerClass1").setValueSerializationClass("SerClass2");
+ builder.setKeySerializationClass("SerClass1", null).setValueSerializationClass("SerClass2", null);
UnorderedKVEdgeConfigurer configuration = builder.build();