You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ec...@apache.org on 2013/02/04 20:19:58 UTC

svn commit: r1442303 - in /accumulo/trunk/server/src/main/java/org/apache/accumulo/server: logger/LogReader.java tabletserver/log/DfsLogger.java tabletserver/log/LogSorter.java

Author: ecn
Date: Mon Feb  4 19:19:58 2013
New Revision: 1442303

URL: http://svn.apache.org/viewvc?rev=1442303&view=rev
Log:
ACCUMULO-1032 move header reading to a utility method and use it in LogReader 

Modified:
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/logger/LogReader.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java

Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/logger/LogReader.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/logger/LogReader.java?rev=1442303&r1=1442302&r2=1442303&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/logger/LogReader.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/logger/LogReader.java Mon Feb  4 19:19:58 2013
@@ -19,8 +19,10 @@ package org.apache.accumulo.server.logge
 import java.io.EOFException;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
@@ -31,6 +33,7 @@ import org.apache.accumulo.core.data.Mut
 import org.apache.accumulo.core.file.FileUtil;
 import org.apache.accumulo.core.util.CachedConfiguration;
 import org.apache.accumulo.server.conf.ServerConfiguration;
+import org.apache.accumulo.server.tabletserver.log.DfsLogger;
 import org.apache.accumulo.server.tabletserver.log.MultiReader;
 import org.apache.accumulo.server.trace.TraceFileSystem;
 import org.apache.hadoop.conf.Configuration;
@@ -93,33 +96,42 @@ public class LogReader {
     
     for (String file : opts.files) {
       
+      Map<String, String> meta = new HashMap<String, String>();
       Path path = new Path(file);
       LogFileKey key = new LogFileKey();
       LogFileValue value = new LogFileValue();
       
       if (fs.isFile(path)) {
         // read log entries from a simple hdfs file
-        FSDataInputStream f = fs.open(path);
-        while (true) {
-          try {
-            key.readFields(f);
-            value.readFields(f);
-          } catch (EOFException ex) {
-            break;
+        FSDataInputStream f = DfsLogger.readHeader(fs, path, meta);
+        try {
+          while (true) {
+            try {
+              key.readFields(f);
+              value.readFields(f);
+            } catch (EOFException ex) {
+              break;
+            }
+            printLogEvent(key, value, row, rowMatcher, ke, tabletIds, opts.maxMutations);
           }
-          printLogEvent(key, value, row, rowMatcher, ke, tabletIds, opts.maxMutations);
+        } finally {
+          f.close();
         }
       } else if (local.isFile(path)) {
         // read log entries from a simple file
-        FSDataInputStream f = fs.open(path);
-        while (true) {
-          try {
-            key.readFields(f);
-            value.readFields(f);
-          } catch (EOFException ex) {
-            break;
+        FSDataInputStream f = DfsLogger.readHeader(fs, path, meta);
+        try {
+          while (true) {
+            try {
+              key.readFields(f);
+              value.readFields(f);
+            } catch (EOFException ex) {
+              break;
+            }
+            printLogEvent(key, value, row, rowMatcher, ke, tabletIds, opts.maxMutations);
           }
-          printLogEvent(key, value, row, rowMatcher, ke, tabletIds, opts.maxMutations);
+        } finally {
+          f.close();
         }
       } else {
         // read the log entries sorted in a map file

Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java?rev=1442303&r1=1442302&r2=1442303&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java Mon Feb  4 19:19:58 2013
@@ -48,6 +48,7 @@ import org.apache.accumulo.server.logger
 import org.apache.accumulo.server.logger.LogFileValue;
 import org.apache.accumulo.server.master.state.TServerInstance;
 import org.apache.accumulo.server.tabletserver.TabletMutations;
+import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -216,6 +217,30 @@ public class DfsLogger {
     this.logPath = new Path(Constants.getWalDirectory(conf.getConfiguration()), filename);
   }
   
+  public static FSDataInputStream readHeader(FileSystem fs, Path path, Map<String, String> opts) throws IOException {
+    FSDataInputStream file = fs.open(path);
+    try {
+      byte[] magic = LOG_FILE_HEADER_V2.getBytes();
+      byte[] buffer = new byte[magic.length];
+      int read = file.read(buffer);
+      if (read == magic.length && Arrays.equals(buffer, magic)) {
+        int count = file.readInt();
+        for (int i = 0; i < count; i++) {
+          String key = file.readUTF();
+          String value = file.readUTF();
+          opts.put(key, value);
+        }
+      } else {
+        file.seek(0);
+        return file;
+      }
+      return file;
+    } catch (IOException ex) {
+      file.seek(0);
+      return file;
+    }
+  }
+  
   public synchronized void open(String address) throws IOException {
     String filename = UUID.randomUUID().toString();
     logger = StringUtil.join(Arrays.asList(address.split(":")), "+");
@@ -241,7 +266,7 @@ public class DfsLogger {
       CryptoModule cryptoModule = CryptoModuleFactory.getCryptoModule(conf.getConfiguration().get(Property.CRYPTO_MODULE_CLASS));
       
       // Initialize the log file with a header and the crypto params used to set up this log file.
-      logFile.writeUTF(LOG_FILE_HEADER_V2);
+      logFile.write(LOG_FILE_HEADER_V2.getBytes());
       Map<String,String> cryptoOpts = conf.getConfiguration().getAllPropertiesWithPrefix(Property.CRYPTO_PREFIX);
       
       logFile.writeInt(cryptoOpts.size());

Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java?rev=1442303&r1=1442302&r2=1442303&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java Mon Feb  4 19:19:58 2013
@@ -111,17 +111,13 @@ public class LogSorter {
         FSDataInputStream tmpInput = fs.open(srcPath);
         DataInputStream tmpDecryptingInput = tmpInput;
         
-        String logHeader = tmpInput.readUTF();
         Map<String,String> cryptoOpts = new HashMap<String,String>();
+        tmpInput = DfsLogger.readHeader(fs, srcPath, cryptoOpts);
         
-        if (!logHeader.equals(DfsLogger.LOG_FILE_HEADER_V2)) {
+        if (!cryptoOpts.containsKey(Property.CRYPTO_MODULE_CLASS.getKey())) {
           
           log.debug("Not a V2 log file, so re-opening it and passing it on");
           
-          // Hmmm, this isn't the log file I was expecting, so close it and reopen to unread those bytes.
-          tmpInput.close();
-          tmpInput = fs.open(srcPath);
-          
           synchronized (this) {
             this.input = tmpInput;
             this.decryptingInput = tmpInput;
@@ -129,11 +125,6 @@ public class LogSorter {
           
         } else {
           
-          int numEntries = tmpInput.readInt();
-          for (int i = 0; i < numEntries; i++) {
-            cryptoOpts.put(tmpInput.readUTF(), tmpInput.readUTF());
-          }
-          
           String cryptoModuleName = cryptoOpts.get(Property.CRYPTO_MODULE_CLASS.getKey());
           if (cryptoModuleName == null) {
             // If for whatever reason we didn't get a configured crypto module (old log file version, for instance)