You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ja...@apache.org on 2013/04/13 20:41:33 UTC
git commit: SQOOP-844: Sqoop2: HdfsExportPartitioner is not always
respecting maximal number of partitions
Updated Branches:
refs/heads/sqoop2 3865f7dee -> ae075f2e3
SQOOP-844: Sqoop2: HdfsExportPartitioner is not always respecting maximal number of partitions
(Vasanth kumar RJ via Jarek Jarcec Cecho)
Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/ae075f2e
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/ae075f2e
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/ae075f2e
Branch: refs/heads/sqoop2
Commit: ae075f2e3769da9e43736a865f51eca96c8db61a
Parents: 3865f7d
Author: Jarek Jarcec Cecho <ja...@apache.org>
Authored: Sat Apr 13 11:41:04 2013 -0700
Committer: Jarek Jarcec Cecho <ja...@apache.org>
Committed: Sat Apr 13 11:41:04 2013 -0700
----------------------------------------------------------------------
.../sqoop/job/etl/HdfsExportPartitioner.java | 4 ++
.../java/org/apache/sqoop/job/TestHdfsExtract.java | 29 +++++++++++++++
2 files changed, 33 insertions(+), 0 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/sqoop/blob/ae075f2e/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportPartitioner.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportPartitioner.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportPartitioner.java
index 115ca54..b3590dc 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportPartitioner.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportPartitioner.java
@@ -73,6 +73,10 @@ public class HdfsExportPartitioner extends Partitioner {
long numInputBytes = getInputSize(conf);
maxSplitSize = numInputBytes / context.getMaxPartitions();
+ if(numInputBytes % context.getMaxPartitions() != 0 ) {
+ maxSplitSize += 1;
+ }
+
long minSizeNode = 0;
long minSizeRack = 0;
long maxSize = 0;
http://git-wip-us.apache.org/repos/asf/sqoop/blob/ae075f2e/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsExtract.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsExtract.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsExtract.java
index 62f3a03..b3e6050 100644
--- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsExtract.java
+++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsExtract.java
@@ -21,6 +21,7 @@ import java.io.BufferedWriter;
import java.io.IOException;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
+import java.util.List;
import junit.framework.TestCase;
@@ -38,6 +39,8 @@ import org.apache.sqoop.job.etl.HdfsExportPartitioner;
import org.apache.sqoop.job.etl.HdfsSequenceImportLoader;
import org.apache.sqoop.job.etl.Loader;
import org.apache.sqoop.job.etl.LoaderContext;
+import org.apache.sqoop.job.etl.Partition;
+import org.apache.sqoop.job.etl.PartitionerContext;
import org.apache.sqoop.job.io.Data;
import org.apache.sqoop.job.mr.SqoopFileOutputFormat;
import org.junit.Test;
@@ -54,6 +57,32 @@ public class TestHdfsExtract extends TestCase {
indir = INPUT_ROOT + getClass().getSimpleName();
}
+ /**
+ * Test case for validating the number of partitions creation
+ * based on input.
+ * Success if the partitions list size is less or equal to
+ * given max partition.
+ * @throws Exception
+ */
+ @Test
+ public void testHdfsExportPartitioner() throws Exception {
+ FileUtils.delete(indir);
+ FileUtils.mkdirs(indir);
+ createTextInput(null);
+ Configuration conf = new Configuration();
+ conf.set(JobConstants.HADOOP_INPUTDIR, indir);
+
+ HdfsExportPartitioner partitioner = new HdfsExportPartitioner();
+ PrefixContext prefixContext = new PrefixContext(conf, "");
+ int[] partitionValues = {2, 3, 4, 5, 7, 8, 9, 10, 11, 12, 13, 17};
+
+ for(int maxPartitions : partitionValues) {
+ PartitionerContext partCont = new PartitionerContext(prefixContext, maxPartitions);
+ List<Partition> partitionList = partitioner.getPartitions(partCont, null, null);
+ assertTrue(partitionList.size()<=maxPartitions);
+ }
+ }
+
@Test
public void testUncompressedText() throws Exception {
FileUtils.delete(indir);