You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ra...@apache.org on 2014/10/09 06:21:50 UTC
git commit: HBASE-12201 Close the writers in the MOB sweep tool
(Jingcheng Du)
Repository: hbase
Updated Branches:
refs/heads/hbase-11339 b72eb7f92 -> aa523164e
HBASE-12201 Close the writers in the MOB sweep tool (Jingcheng Du)
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/aa523164
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/aa523164
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/aa523164
Branch: refs/heads/hbase-11339
Commit: aa523164e825567d93eb0b2a191955ca195ea242
Parents: b72eb7f
Author: Ramkrishna <ra...@intel.com>
Authored: Thu Oct 9 09:50:39 2014 +0530
Committer: Ramkrishna <ra...@intel.com>
Committed: Thu Oct 9 09:50:39 2014 +0530
----------------------------------------------------------------------
.../org/apache/hadoop/hbase/mob/MobUtils.java | 4 +--
.../hadoop/hbase/mob/mapreduce/SweepJob.java | 29 ++++++++++++++------
.../hbase/mob/mapreduce/SweepReducer.java | 14 ++++++----
3 files changed, 30 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/aa523164/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java
index 4001520..0f6aa6f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java
@@ -63,7 +63,6 @@ import org.apache.hadoop.hbase.util.FSUtils;
public class MobUtils {
private static final Log LOG = LogFactory.getLog(MobUtils.class);
- private static final String COMPACTION_WORKING_DIR_NAME = "working";
private static final ThreadLocal<SimpleDateFormat> LOCAL_FORMAT =
new ThreadLocal<SimpleDateFormat>() {
@@ -360,8 +359,7 @@ public class MobUtils {
* @return The directory of the mob compaction for the current job.
*/
public static Path getCompactionWorkingPath(Path root, String jobName) {
- Path parent = new Path(root, jobName);
- return new Path(parent, COMPACTION_WORKING_DIR_NAME);
+ return new Path(root, jobName);
}
/**
http://git-wip-us.apache.org/repos/asf/hbase/blob/aa523164/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepJob.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepJob.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepJob.java
index 1d048bb..2ab12d9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepJob.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepJob.java
@@ -33,6 +33,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -57,6 +58,7 @@ import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.serializer.JavaSerialization;
import org.apache.hadoop.io.serializer.WritableSerialization;
import org.apache.hadoop.mapreduce.Job;
@@ -288,9 +290,6 @@ public class SweepJob {
job.getConfiguration().set(WORKING_ALLNAMES_FILE_KEY, allFileNamesPath.toString());
Path vistiedFileNamesPath = new Path(workingPathOfNames, WORKING_VISITED_DIR);
job.getConfiguration().set(WORKING_VISITED_DIR_KEY, vistiedFileNamesPath.toString());
- // create a file includes all the existing mob files whose creation time is older than
- // (now - oneDay)
- fs.create(allFileNamesPath, true);
// create a directory where the files contain names of visited mob files are saved.
fs.mkdirs(vistiedFileNamesPath);
Path mobStorePath = MobUtils.getMobFamilyPath(job.getConfiguration(), tn, familyName);
@@ -311,15 +310,25 @@ public class SweepJob {
}
}
}
- // write the names to a sequence file
- SequenceFile.Writer writer = SequenceFile.createWriter(fs, job.getConfiguration(),
- allFileNamesPath, String.class, String.class);
+ FSDataOutputStream fout = null;
+ SequenceFile.Writer writer = null;
try {
+ // create a file includes all the existing mob files whose creation time is older than
+ // (now - oneDay)
+ fout = fs.create(allFileNamesPath, true);
+ // write the names to a sequence file
+ writer = SequenceFile.createWriter(job.getConfiguration(), fout, String.class, String.class,
+ CompressionType.NONE, null);
for (String fileName : fileNames) {
writer.append(fileName, MobConstants.EMPTY_STRING);
}
} finally {
- IOUtils.closeStream(writer);
+ if (writer != null) {
+ IOUtils.closeStream(writer);
+ }
+ if (fout != null) {
+ IOUtils.closeStream(fout);
+ }
}
}
@@ -366,7 +375,7 @@ public class SweepJob {
} while (nextAll != null || nextVisited != null);
} finally {
if (allNamesReader != null) {
- allNamesReader.close();
+ IOUtils.closeStream(allNamesReader);
}
if (visitedNamesReader != null) {
visitedNamesReader.close();
@@ -517,7 +526,9 @@ public class SweepJob {
public void close() {
for (SequenceFile.Reader reader : readers) {
- IOUtils.closeStream(reader);
+ if (reader != null) {
+ IOUtils.closeStream(reader);
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/aa523164/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepReducer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepReducer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepReducer.java
index 04fe359..ab8379e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepReducer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepReducer.java
@@ -31,6 +31,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -62,6 +63,7 @@ import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Reducer;
@@ -149,6 +151,7 @@ public class SweepReducer extends Reducer<Text, KeyValue, Writable, Writable> {
String sweeperNode = context.getConfiguration().get(SweepJob.SWEEPER_NODE);
ZooKeeperWatcher zkw = new ZooKeeperWatcher(context.getConfiguration(), jobId,
new DummyMobAbortable());
+ FSDataOutputStream fout = null;
try {
SweepJobNodeTracker tracker = new SweepJobNodeTracker(zkw, sweeperNode, jobId);
tracker.start();
@@ -157,11 +160,9 @@ public class SweepReducer extends Reducer<Text, KeyValue, Writable, Writable> {
String dir = this.conf.get(SweepJob.WORKING_VISITED_DIR_KEY);
Path nameFilePath = new Path(dir, UUID.randomUUID().toString()
.replace("-", MobConstants.EMPTY_STRING));
- if (!fs.exists(nameFilePath)) {
- fs.create(nameFilePath, true);
- }
- writer = SequenceFile.createWriter(fs, context.getConfiguration(), nameFilePath,
- String.class, String.class);
+ fout = fs.create(nameFilePath, true);
+ writer = SequenceFile.createWriter(context.getConfiguration(), fout, String.class,
+ String.class, CompressionType.NONE, null);
SweepPartitionId id;
SweepPartition partition = null;
// the mob files which have the same start key and date are in the same partition.
@@ -195,6 +196,9 @@ public class SweepReducer extends Reducer<Text, KeyValue, Writable, Writable> {
if (writer != null) {
IOUtils.closeStream(writer);
}
+ if (fout != null) {
+ IOUtils.closeStream(fout);
+ }
if (table != null) {
try {
table.close();