You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2013/03/15 20:45:39 UTC
svn commit: r1457079 - in /hbase/branches/0.95:
hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestImportTsv.java
hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java
Author: stack
Date: Fri Mar 15 19:45:39 2013
New Revision: 1457079
URL: http://svn.apache.org/r1457079
Log:
HBASE-4285 partitions file created in user's home directory by importtsv
Modified:
hbase/branches/0.95/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestImportTsv.java
hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java
Modified: hbase/branches/0.95/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestImportTsv.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestImportTsv.java?rev=1457079&r1=1457078&r2=1457079&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestImportTsv.java (original)
+++ hbase/branches/0.95/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestImportTsv.java Fri Mar 15 19:45:39 2013
@@ -5,6 +5,7 @@ import static org.junit.Assert.assertEqu
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+import java.io.IOException;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Set;
@@ -15,6 +16,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.IntegrationTestingUtility;
@@ -25,6 +27,7 @@ import org.apache.hadoop.hbase.client.HT
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
@@ -137,6 +140,18 @@ public class IntegrationTestImportTsv im
}
}
+ /**
+ * Confirm the absence of the {@link TotalOrderPartitioner} partitions file.
+ */
+ protected static void validateDeletedPartitionsFile(Configuration conf) throws IOException {
+ if (!conf.getBoolean(IntegrationTestingUtility.IS_DISTRIBUTED_CLUSTER, false))
+ return;
+
+ FileSystem fs = FileSystem.get(conf);
+ Path partitionsFile = new Path(TotalOrderPartitioner.getPartitionFile(conf));
+ assertFalse("Failed to clean up partitions file.", fs.exists(partitionsFile));
+ }
+
@Test
public void testGenerateAndLoad() throws Exception {
String table = NAME + "-" + UUID.randomUUID();
@@ -155,8 +170,13 @@ public class IntegrationTestImportTsv im
// run the job, complete the load.
util.createTable(table, cf);
- TestImportTsv.doMROnTableTest(util, cf, simple_tsv, args);
+ Tool t = TestImportTsv.doMROnTableTest(util, cf, simple_tsv, args);
doLoadIncrementalHFiles(hfiles, table);
+
+ // validate post-conditions
+ validateDeletedPartitionsFile(t.getConf());
+
+ // clean up after ourselves.
util.deleteTable(table);
util.cleanupDataTestDirOnTestFS(table);
}
Modified: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java?rev=1457079&r1=1457078&r2=1457079&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java (original)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java Fri Mar 15 19:45:39 2013
@@ -20,8 +20,6 @@ package org.apache.hadoop.hbase.mapreduc
import java.io.IOException;
import java.io.UnsupportedEncodingException;
-import java.net.URI;
-import java.net.URISyntaxException;
import java.net.URLDecoder;
import java.net.URLEncoder;
import java.util.ArrayList;
@@ -37,7 +35,6 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HColumnDescriptor;
@@ -54,9 +51,9 @@ import org.apache.hadoop.hbase.io.hfile.
import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl;
import org.apache.hadoop.hbase.io.hfile.NoOpDataBlockEncoder;
+import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.StoreFile;
-import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
@@ -267,13 +264,12 @@ public class HFileOutputFormat extends F
}
/**
- * Write out a SequenceFile that can be read by TotalOrderPartitioner
- * that contains the split points in startKeys.
- * @param partitionsPath output path for SequenceFile
- * @param startKeys the region start keys
+ * Write out a {@link SequenceFile} that can be read by
+ * {@link TotalOrderPartitioner} that contains the split points in startKeys.
*/
private static void writePartitions(Configuration conf, Path partitionsPath,
List<ImmutableBytesWritable> startKeys) throws IOException {
+ LOG.info("Writing partition information to " + partitionsPath);
if (startKeys.isEmpty()) {
throw new IllegalArgumentException("No regions passed");
}
@@ -325,7 +321,6 @@ public class HFileOutputFormat extends F
throws IOException {
Configuration conf = job.getConfiguration();
- job.setPartitionerClass(TotalOrderPartitioner.class);
job.setOutputKeyClass(ImmutableBytesWritable.class);
job.setOutputValueClass(KeyValue.class);
job.setOutputFormatClass(HFileOutputFormat.class);
@@ -341,29 +336,14 @@ public class HFileOutputFormat extends F
LOG.warn("Unknown map output value type:" + job.getMapOutputValueClass());
}
+ // Use table's region boundaries for TOP split points.
LOG.info("Looking up current regions for table " + table);
List<ImmutableBytesWritable> startKeys = getRegionStartKeys(table);
LOG.info("Configuring " + startKeys.size() + " reduce partitions " +
"to match current region count");
job.setNumReduceTasks(startKeys.size());
- Path partitionsPath = new Path(job.getWorkingDirectory(),
- "partitions_" + UUID.randomUUID());
- LOG.info("Writing partition information to " + partitionsPath);
-
- FileSystem fs = partitionsPath.getFileSystem(conf);
- writePartitions(conf, partitionsPath, startKeys);
- partitionsPath.makeQualified(fs);
-
- URI cacheUri;
- try {
- cacheUri = new URI(partitionsPath.toString() + "#" + TotalOrderPartitioner.DEFAULT_PATH);
- } catch (URISyntaxException e) {
- throw new IOException(e);
- }
- DistributedCache.addCacheFile(cacheUri, conf);
- DistributedCache.createSymlink(conf);
-
+ configurePartitioner(job, startKeys);
// Set compression algorithms based on column families
configureCompression(table, conf);
configureBloomType(table, conf);
@@ -415,7 +395,26 @@ public class HFileOutputFormat extends F
}
return confValMap;
}
-
+
+ /**
+ * Configure <code>job</code> with a TotalOrderPartitioner, partitioning against
+ * <code>splitPoints</code>. Cleans up the partitions file after job exists.
+ */
+ static void configurePartitioner(Job job, List<ImmutableBytesWritable> splitPoints)
+ throws IOException {
+
+ // create the partitions file
+ FileSystem fs = FileSystem.get(job.getConfiguration());
+ Path partitionsPath = new Path("/tmp", "partitions_" + UUID.randomUUID());
+ fs.makeQualified(partitionsPath);
+ fs.deleteOnExit(partitionsPath);
+ writePartitions(job.getConfiguration(), partitionsPath, splitPoints);
+
+ // configure job to use it
+ job.setPartitionerClass(TotalOrderPartitioner.class);
+ TotalOrderPartitioner.setPartitionFile(job.getConfiguration(), partitionsPath);
+ }
+
/**
* Serialize column family to compression algorithm map to configuration.
* Invoked while configuring the MR job for incremental load.