You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ra...@apache.org on 2014/10/09 06:21:50 UTC

git commit: HBASE-12201 Close the writers in the MOB sweep tool (Jingcheng Du)

Repository: hbase
Updated Branches:
  refs/heads/hbase-11339 b72eb7f92 -> aa523164e


HBASE-12201 Close the writers in the MOB sweep tool (Jingcheng Du)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/aa523164
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/aa523164
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/aa523164

Branch: refs/heads/hbase-11339
Commit: aa523164e825567d93eb0b2a191955ca195ea242
Parents: b72eb7f
Author: Ramkrishna <ra...@intel.com>
Authored: Thu Oct 9 09:50:39 2014 +0530
Committer: Ramkrishna <ra...@intel.com>
Committed: Thu Oct 9 09:50:39 2014 +0530

----------------------------------------------------------------------
 .../org/apache/hadoop/hbase/mob/MobUtils.java   |  4 +--
 .../hadoop/hbase/mob/mapreduce/SweepJob.java    | 29 ++++++++++++++------
 .../hbase/mob/mapreduce/SweepReducer.java       | 14 ++++++----
 3 files changed, 30 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/aa523164/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java
index 4001520..0f6aa6f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java
@@ -63,7 +63,6 @@ import org.apache.hadoop.hbase.util.FSUtils;
 public class MobUtils {
 
   private static final Log LOG = LogFactory.getLog(MobUtils.class);
-  private static final String COMPACTION_WORKING_DIR_NAME = "working";
 
   private static final ThreadLocal<SimpleDateFormat> LOCAL_FORMAT =
       new ThreadLocal<SimpleDateFormat>() {
@@ -360,8 +359,7 @@ public class MobUtils {
    * @return The directory of the mob compaction for the current job.
    */
   public static Path getCompactionWorkingPath(Path root, String jobName) {
-    Path parent = new Path(root, jobName);
-    return new Path(parent, COMPACTION_WORKING_DIR_NAME);
+    return new Path(root, jobName);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/aa523164/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepJob.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepJob.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepJob.java
index 1d048bb..2ab12d9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepJob.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepJob.java
@@ -33,6 +33,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -57,6 +58,7 @@ import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.io.serializer.JavaSerialization;
 import org.apache.hadoop.io.serializer.WritableSerialization;
 import org.apache.hadoop.mapreduce.Job;
@@ -288,9 +290,6 @@ public class SweepJob {
     job.getConfiguration().set(WORKING_ALLNAMES_FILE_KEY, allFileNamesPath.toString());
     Path vistiedFileNamesPath = new Path(workingPathOfNames, WORKING_VISITED_DIR);
     job.getConfiguration().set(WORKING_VISITED_DIR_KEY, vistiedFileNamesPath.toString());
-    // create a file includes all the existing mob files whose creation time is older than
-    // (now - oneDay)
-    fs.create(allFileNamesPath, true);
     // create a directory where the files contain names of visited mob files are saved.
     fs.mkdirs(vistiedFileNamesPath);
     Path mobStorePath = MobUtils.getMobFamilyPath(job.getConfiguration(), tn, familyName);
@@ -311,15 +310,25 @@ public class SweepJob {
         }
       }
     }
-    // write the names to a sequence file
-    SequenceFile.Writer writer = SequenceFile.createWriter(fs, job.getConfiguration(),
-        allFileNamesPath, String.class, String.class);
+    FSDataOutputStream fout = null;
+    SequenceFile.Writer writer = null;
     try {
+      // create a file includes all the existing mob files whose creation time is older than
+      // (now - oneDay)
+      fout = fs.create(allFileNamesPath, true);
+      // write the names to a sequence file
+      writer = SequenceFile.createWriter(job.getConfiguration(), fout, String.class, String.class,
+          CompressionType.NONE, null);
       for (String fileName : fileNames) {
         writer.append(fileName, MobConstants.EMPTY_STRING);
       }
     } finally {
-      IOUtils.closeStream(writer);
+      if (writer != null) {
+        IOUtils.closeStream(writer);
+      }
+      if (fout != null) {
+        IOUtils.closeStream(fout);
+      }
     }
   }
 
@@ -366,7 +375,7 @@ public class SweepJob {
       } while (nextAll != null || nextVisited != null);
     } finally {
       if (allNamesReader != null) {
-        allNamesReader.close();
+        IOUtils.closeStream(allNamesReader);
       }
       if (visitedNamesReader != null) {
         visitedNamesReader.close();
@@ -517,7 +526,9 @@ public class SweepJob {
 
     public void close() {
       for (SequenceFile.Reader reader : readers) {
-        IOUtils.closeStream(reader);
+        if (reader != null) {
+          IOUtils.closeStream(reader);
+        }
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/aa523164/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepReducer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepReducer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepReducer.java
index 04fe359..ab8379e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepReducer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepReducer.java
@@ -31,6 +31,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -62,6 +63,7 @@ import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.Reducer;
@@ -149,6 +151,7 @@ public class SweepReducer extends Reducer<Text, KeyValue, Writable, Writable> {
     String sweeperNode = context.getConfiguration().get(SweepJob.SWEEPER_NODE);
     ZooKeeperWatcher zkw = new ZooKeeperWatcher(context.getConfiguration(), jobId,
         new DummyMobAbortable());
+    FSDataOutputStream fout = null;
     try {
       SweepJobNodeTracker tracker = new SweepJobNodeTracker(zkw, sweeperNode, jobId);
       tracker.start();
@@ -157,11 +160,9 @@ public class SweepReducer extends Reducer<Text, KeyValue, Writable, Writable> {
       String dir = this.conf.get(SweepJob.WORKING_VISITED_DIR_KEY);
       Path nameFilePath = new Path(dir, UUID.randomUUID().toString()
           .replace("-", MobConstants.EMPTY_STRING));
-      if (!fs.exists(nameFilePath)) {
-        fs.create(nameFilePath, true);
-      }
-      writer = SequenceFile.createWriter(fs, context.getConfiguration(), nameFilePath,
-          String.class, String.class);
+      fout = fs.create(nameFilePath, true);
+      writer = SequenceFile.createWriter(context.getConfiguration(), fout, String.class,
+          String.class, CompressionType.NONE, null);
       SweepPartitionId id;
       SweepPartition partition = null;
       // the mob files which have the same start key and date are in the same partition.
@@ -195,6 +196,9 @@ public class SweepReducer extends Reducer<Text, KeyValue, Writable, Writable> {
       if (writer != null) {
         IOUtils.closeStream(writer);
       }
+      if (fout != null) {
+        IOUtils.closeStream(fout);
+      }
       if (table != null) {
         try {
           table.close();