You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ec...@apache.org on 2013/02/04 20:19:58 UTC
svn commit: r1442303 - in
/accumulo/trunk/server/src/main/java/org/apache/accumulo/server:
logger/LogReader.java tabletserver/log/DfsLogger.java
tabletserver/log/LogSorter.java
Author: ecn
Date: Mon Feb 4 19:19:58 2013
New Revision: 1442303
URL: http://svn.apache.org/viewvc?rev=1442303&view=rev
Log:
ACCUMULO-1032 move header reading to a utility method and use it in LogReader
Modified:
accumulo/trunk/server/src/main/java/org/apache/accumulo/server/logger/LogReader.java
accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java
accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java
Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/logger/LogReader.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/logger/LogReader.java?rev=1442303&r1=1442302&r2=1442303&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/logger/LogReader.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/logger/LogReader.java Mon Feb 4 19:19:58 2013
@@ -19,8 +19,10 @@ package org.apache.accumulo.server.logge
import java.io.EOFException;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -31,6 +33,7 @@ import org.apache.accumulo.core.data.Mut
import org.apache.accumulo.core.file.FileUtil;
import org.apache.accumulo.core.util.CachedConfiguration;
import org.apache.accumulo.server.conf.ServerConfiguration;
+import org.apache.accumulo.server.tabletserver.log.DfsLogger;
import org.apache.accumulo.server.tabletserver.log.MultiReader;
import org.apache.accumulo.server.trace.TraceFileSystem;
import org.apache.hadoop.conf.Configuration;
@@ -93,33 +96,42 @@ public class LogReader {
for (String file : opts.files) {
+ Map<String, String> meta = new HashMap<String, String>();
Path path = new Path(file);
LogFileKey key = new LogFileKey();
LogFileValue value = new LogFileValue();
if (fs.isFile(path)) {
// read log entries from a simple hdfs file
- FSDataInputStream f = fs.open(path);
- while (true) {
- try {
- key.readFields(f);
- value.readFields(f);
- } catch (EOFException ex) {
- break;
+ FSDataInputStream f = DfsLogger.readHeader(fs, path, meta);
+ try {
+ while (true) {
+ try {
+ key.readFields(f);
+ value.readFields(f);
+ } catch (EOFException ex) {
+ break;
+ }
+ printLogEvent(key, value, row, rowMatcher, ke, tabletIds, opts.maxMutations);
}
- printLogEvent(key, value, row, rowMatcher, ke, tabletIds, opts.maxMutations);
+ } finally {
+ f.close();
}
} else if (local.isFile(path)) {
// read log entries from a simple file
- FSDataInputStream f = fs.open(path);
- while (true) {
- try {
- key.readFields(f);
- value.readFields(f);
- } catch (EOFException ex) {
- break;
+ FSDataInputStream f = DfsLogger.readHeader(fs, path, meta);
+ try {
+ while (true) {
+ try {
+ key.readFields(f);
+ value.readFields(f);
+ } catch (EOFException ex) {
+ break;
+ }
+ printLogEvent(key, value, row, rowMatcher, ke, tabletIds, opts.maxMutations);
}
- printLogEvent(key, value, row, rowMatcher, ke, tabletIds, opts.maxMutations);
+ } finally {
+ f.close();
}
} else {
// read the log entries sorted in a map file
Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java?rev=1442303&r1=1442302&r2=1442303&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java Mon Feb 4 19:19:58 2013
@@ -48,6 +48,7 @@ import org.apache.accumulo.server.logger
import org.apache.accumulo.server.logger.LogFileValue;
import org.apache.accumulo.server.master.state.TServerInstance;
import org.apache.accumulo.server.tabletserver.TabletMutations;
+import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -216,6 +217,30 @@ public class DfsLogger {
this.logPath = new Path(Constants.getWalDirectory(conf.getConfiguration()), filename);
}
+ public static FSDataInputStream readHeader(FileSystem fs, Path path, Map<String, String> opts) throws IOException {
+ FSDataInputStream file = fs.open(path);
+ try {
+ byte[] magic = LOG_FILE_HEADER_V2.getBytes();
+ byte[] buffer = new byte[magic.length];
+ int read = file.read(buffer);
+ if (read == magic.length && Arrays.equals(buffer, magic)) {
+ int count = file.readInt();
+ for (int i = 0; i < count; i++) {
+ String key = file.readUTF();
+ String value = file.readUTF();
+ opts.put(key, value);
+ }
+ } else {
+ file.seek(0);
+ return file;
+ }
+ return file;
+ } catch (IOException ex) {
+ file.seek(0);
+ return file;
+ }
+ }
+
public synchronized void open(String address) throws IOException {
String filename = UUID.randomUUID().toString();
logger = StringUtil.join(Arrays.asList(address.split(":")), "+");
@@ -241,7 +266,7 @@ public class DfsLogger {
CryptoModule cryptoModule = CryptoModuleFactory.getCryptoModule(conf.getConfiguration().get(Property.CRYPTO_MODULE_CLASS));
// Initialize the log file with a header and the crypto params used to set up this log file.
- logFile.writeUTF(LOG_FILE_HEADER_V2);
+ logFile.write(LOG_FILE_HEADER_V2.getBytes());
Map<String,String> cryptoOpts = conf.getConfiguration().getAllPropertiesWithPrefix(Property.CRYPTO_PREFIX);
logFile.writeInt(cryptoOpts.size());
Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java?rev=1442303&r1=1442302&r2=1442303&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java Mon Feb 4 19:19:58 2013
@@ -111,17 +111,13 @@ public class LogSorter {
FSDataInputStream tmpInput = fs.open(srcPath);
DataInputStream tmpDecryptingInput = tmpInput;
- String logHeader = tmpInput.readUTF();
Map<String,String> cryptoOpts = new HashMap<String,String>();
+ tmpInput = DfsLogger.readHeader(fs, srcPath, cryptoOpts);
- if (!logHeader.equals(DfsLogger.LOG_FILE_HEADER_V2)) {
+ if (!cryptoOpts.containsKey(Property.CRYPTO_MODULE_CLASS.getKey())) {
log.debug("Not a V2 log file, so re-opening it and passing it on");
- // Hmmm, this isn't the log file I was expecting, so close it and reopen to unread those bytes.
- tmpInput.close();
- tmpInput = fs.open(srcPath);
-
synchronized (this) {
this.input = tmpInput;
this.decryptingInput = tmpInput;
@@ -129,11 +125,6 @@ public class LogSorter {
} else {
- int numEntries = tmpInput.readInt();
- for (int i = 0; i < numEntries; i++) {
- cryptoOpts.put(tmpInput.readUTF(), tmpInput.readUTF());
- }
-
String cryptoModuleName = cryptoOpts.get(Property.CRYPTO_MODULE_CLASS.getKey());
if (cryptoModuleName == null) {
// If for whatever reason we didn't get a configured crypto module (old log file version, for instance)