You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by af...@apache.org on 2012/07/05 23:03:21 UTC
svn commit: r1357909 [2/3] - in /accumulo/branches/ACCUMULO-652: ./ bin/
core/ core/src/main/java/org/apache/accumulo/core/
core/src/main/java/org/apache/accumulo/core/client/
core/src/main/java/org/apache/accumulo/core/client/admin/
core/src/main/java...
Modified: accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/master/state/ZooStore.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/master/state/ZooStore.java?rev=1357909&r1=1357908&r2=1357909&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/master/state/ZooStore.java (original)
+++ accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/master/state/ZooStore.java Thu Jul 5 21:03:16 2012
@@ -20,10 +20,10 @@ import java.io.IOException;
import java.util.List;
import org.apache.accumulo.core.zookeeper.ZooUtil;
-import org.apache.accumulo.core.zookeeper.ZooUtil.NodeExistsPolicy;
-import org.apache.accumulo.core.zookeeper.ZooUtil.NodeMissingPolicy;
+import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy;
import org.apache.accumulo.server.client.HdfsZooInstance;
-import org.apache.accumulo.server.zookeeper.IZooReaderWriter;
import org.apache.accumulo.server.zookeeper.ZooCache;
import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
import org.apache.log4j.Logger;
Modified: accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/master/state/tables/TableManager.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/master/state/tables/TableManager.java?rev=1357909&r1=1357908&r2=1357909&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/master/state/tables/TableManager.java (original)
+++ accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/master/state/tables/TableManager.java Thu Jul 5 21:03:16 2012
@@ -28,14 +28,14 @@ import org.apache.accumulo.core.Constant
import org.apache.accumulo.core.client.Instance;
import org.apache.accumulo.core.master.state.tables.TableState;
import org.apache.accumulo.core.zookeeper.ZooUtil;
-import org.apache.accumulo.core.zookeeper.ZooUtil.NodeExistsPolicy;
-import org.apache.accumulo.core.zookeeper.ZooUtil.NodeMissingPolicy;
+import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
+import org.apache.accumulo.fate.zookeeper.ZooReaderWriter.Mutator;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy;
import org.apache.accumulo.server.client.HdfsZooInstance;
import org.apache.accumulo.server.util.TablePropUtil;
-import org.apache.accumulo.server.zookeeper.IZooReaderWriter;
import org.apache.accumulo.server.zookeeper.ZooCache;
import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
-import org.apache.accumulo.server.zookeeper.ZooReaderWriter.Mutator;
import org.apache.log4j.Logger;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
Modified: accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java?rev=1357909&r1=1357908&r2=1357909&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java (original)
+++ accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java Thu Jul 5 21:03:16 2012
@@ -16,13 +16,19 @@
*/
package org.apache.accumulo.server.master.tableOps;
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
import java.io.FileNotFoundException;
import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
@@ -33,6 +39,8 @@ import org.apache.accumulo.cloudtrace.in
import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.IsolatedScanner;
+import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.impl.ServerClient;
import org.apache.accumulo.core.client.impl.Tables;
import org.apache.accumulo.core.client.impl.thrift.ClientService;
@@ -42,30 +50,34 @@ import org.apache.accumulo.core.client.i
import org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.conf.SiteConfiguration;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.file.FileOperations;
import org.apache.accumulo.core.master.state.tables.TableState;
import org.apache.accumulo.core.security.thrift.AuthInfo;
-import org.apache.accumulo.core.util.CachedConfiguration;
import org.apache.accumulo.core.util.Pair;
import org.apache.accumulo.core.util.SimpleThreadPool;
import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.fate.Repo;
import org.apache.accumulo.server.ServerConstants;
import org.apache.accumulo.server.client.HdfsZooInstance;
import org.apache.accumulo.server.conf.ServerConfiguration;
-import org.apache.accumulo.server.fate.Repo;
import org.apache.accumulo.server.master.LiveTServerSet.TServerConnection;
import org.apache.accumulo.server.master.Master;
import org.apache.accumulo.server.master.state.TServerInstance;
import org.apache.accumulo.server.security.SecurityConstants;
import org.apache.accumulo.server.tabletserver.UniqueNameAllocator;
-import org.apache.accumulo.server.trace.TraceFileSystem;
import org.apache.accumulo.server.util.MetadataTable;
+import org.apache.accumulo.server.zookeeper.DistributedWorkQueue;
import org.apache.accumulo.server.zookeeper.TransactionWatcher.ZooArbitrator;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.MapFile;
+import org.apache.hadoop.io.Text;
import org.apache.log4j.Logger;
import org.apache.thrift.TException;
@@ -133,9 +145,8 @@ public class BulkImport extends MasterRe
Utils.getReadLock(tableId, tid).lock();
// check that the error directory exists and is empty
- FileSystem fs = TraceFileSystem.wrap(org.apache.accumulo.core.file.FileUtil.getFileSystem(CachedConfiguration.getInstance(),
- ServerConfiguration.getSiteConfiguration()));
- ;
+ FileSystem fs = master.getFileSystem();
+
Path errorPath = new Path(errorDir);
FileStatus errorStatus = fs.getFileStatus(errorPath);
if (errorStatus == null)
@@ -273,24 +284,6 @@ class CleanUpBulkImport extends MasterRe
}
@Override
- public long isReady(long tid, Master master) throws Exception {
- Set<TServerInstance> finished = new HashSet<TServerInstance>();
- Set<TServerInstance> running = master.onlineTabletServers();
- for (TServerInstance server : running) {
- try {
- TServerConnection client = master.getConnection(server);
- if (client != null && !client.isActive(tid))
- finished.add(server);
- } catch (TException ex) {
- log.info("Ignoring error trying to check on tid " + tid + " from server " + server + ": " + ex);
- }
- }
- if (finished.containsAll(running))
- return 0;
- return 1000;
- }
-
- @Override
public Repo<Master> call(long tid, Master master) throws Exception {
log.debug("removing the bulk processing flag file in " + bulk);
Path bulkDir = new Path(bulk);
@@ -327,8 +320,124 @@ class CompleteBulkImport extends MasterR
@Override
public Repo<Master> call(long tid, Master master) throws Exception {
ZooArbitrator.stop(Constants.BULK_ARBITRATOR_TYPE, tid);
+ return new CopyFailed(tableId, source, bulk, error);
+ }
+}
+
+class CopyFailed extends MasterRepo {
+
+ private static final long serialVersionUID = 1L;
+
+ private String tableId;
+ private String source;
+ private String bulk;
+ private String error;
+
+ public CopyFailed(String tableId, String source, String bulk, String error) {
+ this.tableId = tableId;
+ this.source = source;
+ this.bulk = bulk;
+ this.error = error;
+ }
+
+ @Override
+ public long isReady(long tid, Master master) throws Exception {
+ Set<TServerInstance> finished = new HashSet<TServerInstance>();
+ Set<TServerInstance> running = master.onlineTabletServers();
+ for (TServerInstance server : running) {
+ try {
+ TServerConnection client = master.getConnection(server);
+ if (client != null && !client.isActive(tid))
+ finished.add(server);
+ } catch (TException ex) {
+ log.info("Ignoring error trying to check on tid " + tid + " from server " + server + ": " + ex);
+ }
+ }
+ if (finished.containsAll(running))
+ return 0;
+ return 500;
+ }
+
+ @Override
+ public Repo<Master> call(long tid, Master environment) throws Exception {
+ //This needs to execute after the arbiter is stopped
+
+ FileSystem fs = environment.getFileSystem();
+
+ if (!fs.exists(new Path(error, "failures.txt")))
+ return new CleanUpBulkImport(tableId, source, bulk, error);
+
+ HashMap<String,String> failures = new HashMap<String,String>();
+ HashMap<String,String> loadedFailures = new HashMap<String,String>();
+
+ FSDataInputStream failFile = fs.open(new Path(error, "failures.txt"));
+ BufferedReader in = new BufferedReader(new InputStreamReader(failFile));
+ try {
+ String line = null;
+ while ((line = in.readLine()) != null) {
+ Path path = new Path(line);
+ if (!fs.exists(new Path(error, path.getName())))
+ failures.put("/" + path.getParent().getName() + "/" + path.getName(), line);
+ }
+ } finally {
+ failFile.close();
+ }
+
+ /*
+ * I thought I could move files that have no file references in the table. However its possible a clone references a file. Therefore only move files that
+ * have no loaded markers.
+ */
+
+ // determine which failed files were loaded
+ AuthInfo creds = SecurityConstants.getSystemCredentials();
+ Connector conn = HdfsZooInstance.getInstance().getConnector(creds.user, creds.password);
+ Scanner mscanner = new IsolatedScanner(conn.createScanner(Constants.METADATA_TABLE_NAME, Constants.NO_AUTHS));
+ mscanner.setRange(new KeyExtent(new Text(tableId), null, null).toMetadataRange());
+ mscanner.fetchColumnFamily(Constants.METADATA_BULKFILE_COLUMN_FAMILY);
+
+ for (Entry<Key,Value> entry : mscanner) {
+ if (Long.parseLong(entry.getValue().toString()) == tid) {
+ String loadedFile = entry.getKey().getColumnQualifierData().toString();
+ String absPath = failures.remove(loadedFile);
+ if (absPath != null) {
+ loadedFailures.put(loadedFile, absPath);
+ }
+ }
+ }
+
+ // move failed files that were not loaded
+ for (String failure : failures.values()) {
+ Path orig = new Path(failure);
+ Path dest = new Path(error, orig.getName());
+ fs.rename(orig, dest);
+ log.debug("tid " + tid + " renamed " + orig + " to " + dest + ": failed");
+ }
+
+ if (loadedFailures.size() > 0) {
+ DistributedWorkQueue bifCopyQueue = new DistributedWorkQueue(Constants.ZROOT + "/" + HdfsZooInstance.getInstance().getInstanceID()
+ + Constants.ZBULK_FAILED_COPYQ);
+
+ HashSet<String> workIds = new HashSet<String>();
+
+ for (String failure : loadedFailures.values()) {
+ Path orig = new Path(failure);
+ Path dest = new Path(error, orig.getName());
+
+ if (fs.exists(dest))
+ continue;
+
+ bifCopyQueue.addWork(orig.getName(), (failure + "," + dest).getBytes());
+ workIds.add(orig.getName());
+ log.debug("tid " + tid + " added to copyq: " + orig + " to " + dest + ": failed");
+ }
+
+ bifCopyQueue.waitUntilDone(workIds);
+ }
+
+ fs.delete(new Path(error, "failures.txt"), true);
return new CleanUpBulkImport(tableId, source, bulk, error);
}
+
}
class LoadFiles extends MasterRepo {
@@ -375,8 +484,7 @@ class LoadFiles extends MasterRepo {
public Repo<Master> call(final long tid, final Master master) throws Exception {
initializeThreadPool(master);
final SiteConfiguration conf = ServerConfiguration.getSiteConfiguration();
- FileSystem fs = TraceFileSystem.wrap(org.apache.accumulo.core.file.FileUtil.getFileSystem(CachedConfiguration.getInstance(),
- ServerConfiguration.getSiteConfiguration()));
+ FileSystem fs = master.getFileSystem();
List<FileStatus> files = new ArrayList<FileStatus>();
for (FileStatus entry : fs.listStatus(new Path(bulk))) {
files.add(entry);
@@ -448,23 +556,18 @@ class LoadFiles extends MasterRepo {
UtilWaitThread.sleep(100);
}
}
- // Copy/Create failed file markers
- for (String f : filesToLoad) {
- Path orig = new Path(f);
- Path dest = new Path(errorDir, orig.getName());
- try {
- FileUtil.copy(fs, orig, fs, dest, false, true, CachedConfiguration.getInstance());
- log.debug("tid " + tid + " copied " + orig + " to " + dest + ": failed");
- } catch (IOException ex) {
- try {
- fs.create(dest).close();
- log.debug("tid " + tid + " marked " + dest + " failed");
- } catch (IOException e) {
- log.error("Unable to create failure flag file " + dest, e);
- }
+
+ FSDataOutputStream failFile = fs.create(new Path(errorDir, "failures.txt"), true);
+ BufferedWriter out = new BufferedWriter(new OutputStreamWriter(failFile));
+ try {
+ for (String f : filesToLoad) {
+ out.write(f);
+ out.write("\n");
}
+ } finally {
+ out.close();
}
-
+
// return the next step, which will perform cleanup
return new CompleteBulkImport(tableId, source, bulk, errorDir);
}
Modified: accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/master/tableOps/ChangeTableState.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/master/tableOps/ChangeTableState.java?rev=1357909&r1=1357908&r2=1357909&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/master/tableOps/ChangeTableState.java (original)
+++ accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/master/tableOps/ChangeTableState.java Thu Jul 5 21:03:16 2012
@@ -18,7 +18,7 @@ package org.apache.accumulo.server.maste
import org.apache.accumulo.core.client.impl.thrift.TableOperation;
import org.apache.accumulo.core.master.state.tables.TableState;
-import org.apache.accumulo.server.fate.Repo;
+import org.apache.accumulo.fate.Repo;
import org.apache.accumulo.server.master.Master;
import org.apache.accumulo.server.master.state.tables.TableManager;
import org.apache.log4j.Logger;
Modified: accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/master/tableOps/CloneTable.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/master/tableOps/CloneTable.java?rev=1357909&r1=1357908&r2=1357909&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/master/tableOps/CloneTable.java (original)
+++ accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/master/tableOps/CloneTable.java Thu Jul 5 21:03:16 2012
@@ -26,9 +26,9 @@ import org.apache.accumulo.core.client.i
import org.apache.accumulo.core.client.impl.thrift.TableOperation;
import org.apache.accumulo.core.master.state.tables.TableState;
import org.apache.accumulo.core.security.TablePermission;
-import org.apache.accumulo.core.zookeeper.ZooUtil.NodeExistsPolicy;
+import org.apache.accumulo.fate.Repo;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
import org.apache.accumulo.server.client.HdfsZooInstance;
-import org.apache.accumulo.server.fate.Repo;
import org.apache.accumulo.server.master.Master;
import org.apache.accumulo.server.master.state.tables.TableManager;
import org.apache.accumulo.server.security.SecurityConstants;
Modified: accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/master/tableOps/CompactRange.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/master/tableOps/CompactRange.java?rev=1357909&r1=1357908&r2=1357909&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/master/tableOps/CompactRange.java (original)
+++ accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/master/tableOps/CompactRange.java Thu Jul 5 21:03:16 2012
@@ -41,16 +41,16 @@ import org.apache.accumulo.core.master.s
import org.apache.accumulo.core.tabletserver.thrift.IteratorConfig;
import org.apache.accumulo.core.tabletserver.thrift.TIteratorSetting;
import org.apache.accumulo.core.util.ColumnFQ;
+import org.apache.accumulo.fate.Repo;
+import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
+import org.apache.accumulo.fate.zookeeper.ZooReaderWriter.Mutator;
import org.apache.accumulo.server.client.HdfsZooInstance;
-import org.apache.accumulo.server.fate.Repo;
import org.apache.accumulo.server.master.LiveTServerSet.TServerConnection;
import org.apache.accumulo.server.master.Master;
import org.apache.accumulo.server.master.state.TServerInstance;
import org.apache.accumulo.server.security.SecurityConstants;
import org.apache.accumulo.server.util.MapCounter;
-import org.apache.accumulo.server.zookeeper.IZooReaderWriter;
import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
-import org.apache.accumulo.server.zookeeper.ZooReaderWriter.Mutator;
import org.apache.commons.codec.binary.Hex;
import org.apache.hadoop.io.Text;
import org.apache.log4j.Logger;
Modified: accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/master/tableOps/CreateTable.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/master/tableOps/CreateTable.java?rev=1357909&r1=1357908&r2=1357909&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/master/tableOps/CreateTable.java (original)
+++ accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/master/tableOps/CreateTable.java Thu Jul 5 21:03:16 2012
@@ -31,11 +31,11 @@ import org.apache.accumulo.core.file.Fil
import org.apache.accumulo.core.master.state.tables.TableState;
import org.apache.accumulo.core.security.TablePermission;
import org.apache.accumulo.core.util.CachedConfiguration;
-import org.apache.accumulo.core.zookeeper.ZooUtil.NodeExistsPolicy;
+import org.apache.accumulo.fate.Repo;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
import org.apache.accumulo.server.ServerConstants;
import org.apache.accumulo.server.client.HdfsZooInstance;
import org.apache.accumulo.server.conf.ServerConfiguration;
-import org.apache.accumulo.server.fate.Repo;
import org.apache.accumulo.server.master.Master;
import org.apache.accumulo.server.master.state.tables.TableManager;
import org.apache.accumulo.server.security.Authenticator;
Modified: accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/master/tableOps/DeleteTable.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/master/tableOps/DeleteTable.java?rev=1357909&r1=1357908&r2=1357909&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/master/tableOps/DeleteTable.java (original)
+++ accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/master/tableOps/DeleteTable.java Thu Jul 5 21:03:16 2012
@@ -36,9 +36,9 @@ import org.apache.accumulo.core.data.Val
import org.apache.accumulo.core.iterators.user.GrepIterator;
import org.apache.accumulo.core.master.state.tables.TableState;
import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.accumulo.fate.Repo;
import org.apache.accumulo.server.ServerConstants;
import org.apache.accumulo.server.client.HdfsZooInstance;
-import org.apache.accumulo.server.fate.Repo;
import org.apache.accumulo.server.master.Master;
import org.apache.accumulo.server.master.state.MetaDataTableScanner;
import org.apache.accumulo.server.master.state.TabletLocationState;
Modified: accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/master/tableOps/MasterRepo.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/master/tableOps/MasterRepo.java?rev=1357909&r1=1357908&r2=1357909&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/master/tableOps/MasterRepo.java (original)
+++ accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/master/tableOps/MasterRepo.java Thu Jul 5 21:03:16 2012
@@ -16,7 +16,7 @@
*/
package org.apache.accumulo.server.master.tableOps;
-import org.apache.accumulo.server.fate.Repo;
+import org.apache.accumulo.fate.Repo;
import org.apache.accumulo.server.master.Master;
import org.apache.log4j.Logger;
Modified: accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/master/tableOps/RenameTable.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/master/tableOps/RenameTable.java?rev=1357909&r1=1357908&r2=1357909&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/master/tableOps/RenameTable.java (original)
+++ accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/master/tableOps/RenameTable.java Thu Jul 5 21:03:16 2012
@@ -23,12 +23,12 @@ import org.apache.accumulo.core.client.i
import org.apache.accumulo.core.client.impl.thrift.TableOperationExceptionType;
import org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException;
import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.fate.Repo;
+import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
+import org.apache.accumulo.fate.zookeeper.ZooReaderWriter.Mutator;
import org.apache.accumulo.server.client.HdfsZooInstance;
-import org.apache.accumulo.server.fate.Repo;
import org.apache.accumulo.server.master.Master;
-import org.apache.accumulo.server.zookeeper.IZooReaderWriter;
import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
-import org.apache.accumulo.server.zookeeper.ZooReaderWriter.Mutator;
import org.apache.log4j.Logger;
public class RenameTable extends MasterRepo {
Modified: accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/master/tableOps/TableRangeOp.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/master/tableOps/TableRangeOp.java?rev=1357909&r1=1357908&r2=1357909&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/master/tableOps/TableRangeOp.java (original)
+++ accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/master/tableOps/TableRangeOp.java Thu Jul 5 21:03:16 2012
@@ -26,7 +26,7 @@ import org.apache.accumulo.core.client.i
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.data.KeyExtent;
import org.apache.accumulo.core.util.TextUtil;
-import org.apache.accumulo.server.fate.Repo;
+import org.apache.accumulo.fate.Repo;
import org.apache.accumulo.server.master.Master;
import org.apache.accumulo.server.master.state.MergeInfo;
import org.apache.accumulo.server.master.state.MergeInfo.Operation;
Modified: accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/master/tableOps/TraceRepo.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/master/tableOps/TraceRepo.java?rev=1357909&r1=1357908&r2=1357909&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/master/tableOps/TraceRepo.java (original)
+++ accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/master/tableOps/TraceRepo.java Thu Jul 5 21:03:16 2012
@@ -20,7 +20,7 @@ import org.apache.accumulo.cloudtrace.in
import org.apache.accumulo.cloudtrace.instrument.Trace;
import org.apache.accumulo.cloudtrace.instrument.Tracer;
import org.apache.accumulo.cloudtrace.thrift.TInfo;
-import org.apache.accumulo.server.fate.Repo;
+import org.apache.accumulo.fate.Repo;
/**
Modified: accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/master/tableOps/Utils.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/master/tableOps/Utils.java?rev=1357909&r1=1357908&r2=1357909&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/master/tableOps/Utils.java (original)
+++ accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/master/tableOps/Utils.java Thu Jul 5 21:03:16 2012
@@ -27,13 +27,13 @@ import org.apache.accumulo.core.client.i
import org.apache.accumulo.core.client.impl.thrift.TableOperationExceptionType;
import org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException;
import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.fate.zookeeper.DistributedReadWriteLock;
+import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
+import org.apache.accumulo.fate.zookeeper.ZooReaderWriter.Mutator;
+import org.apache.accumulo.fate.zookeeper.ZooReservation;
import org.apache.accumulo.server.client.HdfsZooInstance;
-import org.apache.accumulo.server.zookeeper.DistributedReadWriteLock;
-import org.apache.accumulo.server.zookeeper.IZooReaderWriter;
import org.apache.accumulo.server.zookeeper.ZooQueueLock;
import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
-import org.apache.accumulo.server.zookeeper.ZooReservation;
-import org.apache.accumulo.server.zookeeper.ZooReaderWriter.Mutator;
import org.apache.commons.codec.binary.Base64;
import org.apache.log4j.Logger;
import org.apache.zookeeper.KeeperException;
Modified: accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/master/tserverOps/ShutdownTServer.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/master/tserverOps/ShutdownTServer.java?rev=1357909&r1=1357908&r2=1357909&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/master/tserverOps/ShutdownTServer.java (original)
+++ accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/master/tserverOps/ShutdownTServer.java Thu Jul 5 21:03:16 2012
@@ -20,14 +20,14 @@ import org.apache.accumulo.core.Constant
import org.apache.accumulo.core.master.thrift.TabletServerStatus;
import org.apache.accumulo.core.util.AddressUtil;
import org.apache.accumulo.core.zookeeper.ZooUtil;
-import org.apache.accumulo.core.zookeeper.ZooUtil.NodeExistsPolicy;
-import org.apache.accumulo.server.fate.Repo;
+import org.apache.accumulo.fate.Repo;
+import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
import org.apache.accumulo.server.master.EventCoordinator.Listener;
import org.apache.accumulo.server.master.LiveTServerSet.TServerConnection;
import org.apache.accumulo.server.master.Master;
import org.apache.accumulo.server.master.state.TServerInstance;
import org.apache.accumulo.server.master.tableOps.MasterRepo;
-import org.apache.accumulo.server.zookeeper.IZooReaderWriter;
import org.apache.accumulo.server.zookeeper.ZooLock;
import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
import org.apache.log4j.Logger;
Modified: accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/monitor/servlets/BasicServlet.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/monitor/servlets/BasicServlet.java?rev=1357909&r1=1357908&r2=1357909&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/monitor/servlets/BasicServlet.java (original)
+++ accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/monitor/servlets/BasicServlet.java Thu Jul 5 21:03:16 2012
@@ -44,9 +44,9 @@ abstract public class BasicServlet exten
private static final long serialVersionUID = 1L;
protected static final Logger log = Logger.getLogger(BasicServlet.class);
static String cachedInstanceName = null;
- private String bannerText;
- private String bannerColor;
- private String bannerBackground;
+ private static String bannerText;
+ private static String bannerColor;
+ private static String bannerBackground;
abstract protected String getTitle(HttpServletRequest req);
Modified: accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/monitor/servlets/DefaultServlet.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/monitor/servlets/DefaultServlet.java?rev=1357909&r1=1357908&r2=1357909&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/monitor/servlets/DefaultServlet.java (original)
+++ accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/monitor/servlets/DefaultServlet.java Thu Jul 5 21:03:16 2012
@@ -83,13 +83,17 @@ public class DefaultServlet extends Basi
path = path.substring(1);
InputStream data = BasicServlet.class.getClassLoader().getResourceAsStream(path);
ServletOutputStream out = resp.getOutputStream();
- if (data != null) {
- byte[] buffer = new byte[1024];
- int n;
- while ((n = data.read(buffer)) > 0)
- out.write(buffer, 0, n);
- } else {
- out.write(("could not get resource " + path + "").getBytes());
+ try {
+ if (data != null) {
+ byte[] buffer = new byte[1024];
+ int n;
+ while ((n = data.read(buffer)) > 0)
+ out.write(buffer, 0, n);
+ } else {
+ out.write(("could not get resource " + path + "").getBytes());
+ }
+ } finally {
+ data.close();
}
} catch (Throwable t) {
log.error(t, t);
@@ -113,9 +117,10 @@ public class DefaultServlet extends Basi
@Override
public IOException run() {
+ InputStream data = null;
try {
File file = new File(aHome + path);
- InputStream data = new FileInputStream(file.getAbsolutePath());
+ data = new FileInputStream(file.getAbsolutePath());
byte[] buffer = new byte[1024];
int n;
ServletOutputStream out = resp.getOutputStream();
@@ -124,6 +129,14 @@ public class DefaultServlet extends Basi
return null;
} catch (IOException e) {
return e;
+ } finally {
+ if (data != null) {
+ try {
+ data.close();
+ } catch (IOException ex) {
+ log.error(ex, ex);
+ }
+ }
}
}
}, acc);
Modified: accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/monitor/servlets/VisServlet.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/monitor/servlets/VisServlet.java?rev=1357909&r1=1357908&r2=1357909&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/monitor/servlets/VisServlet.java (original)
+++ accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/monitor/servlets/VisServlet.java Thu Jul 5 21:03:16 2012
@@ -32,11 +32,6 @@ public class VisServlet extends BasicSer
private static final int concurrentScans = Monitor.getSystemConfiguration().getCount(Property.TSERV_READ_AHEAD_MAXCONCURRENT);
private static final long serialVersionUID = 1L;
- boolean useCircles;
- StatType motion;
- StatType color;
- int spacing;
- String url;
public enum StatType {
osload(ManagementFactory.getOperatingSystemMXBean().getAvailableProcessors(), true, 100, "OS Load"),
@@ -106,6 +101,14 @@ public class VisServlet extends BasicSer
return count;
}
}
+
+ public static class VisualizationConfig {
+ boolean useCircles = true;
+ StatType motion = StatType.allmax;
+ StatType color = StatType.allavg;
+ int spacing = 40;
+ String url;
+ }
@Override
protected String getTitle(HttpServletRequest req) {
@@ -116,39 +119,36 @@ public class VisServlet extends BasicSer
protected void pageBody(HttpServletRequest req, HttpServletResponse response, StringBuilder sb) throws IOException {
StringBuffer urlsb = req.getRequestURL();
urlsb.setLength(urlsb.lastIndexOf("/") + 1);
- url = urlsb.toString();
+ String url = urlsb.toString();
+ VisualizationConfig cfg = new VisualizationConfig();
- useCircles = true;
String s = req.getParameter("shape");
if (s != null && (s.equals("square") || s.equals("squares"))) {
- useCircles = false;
+ cfg.useCircles = false;
}
s = req.getParameter("motion");
- motion = StatType.allmax;
if (s != null) {
try {
- motion = StatType.valueOf(s);
+ cfg.motion = StatType.valueOf(s);
} catch (Exception e) {}
}
s = req.getParameter("color");
- color = StatType.allavg;
if (s != null) {
try {
- color = StatType.valueOf(s);
+ cfg.color = StatType.valueOf(s);
} catch (Exception e) {}
}
- spacing = 40;
String size = req.getParameter("size");
if (size != null) {
if (size.equals("10"))
- spacing = 10;
+ cfg.spacing = 10;
else if (size.equals("20"))
- spacing = 20;
+ cfg.spacing = 20;
else if (size.equals("80"))
- spacing = 80;
+ cfg.spacing = 80;
}
ArrayList<TabletServerStatus> tservers = new ArrayList<TabletServerStatus>();
@@ -158,30 +158,30 @@ public class VisServlet extends BasicSer
if (tservers.size() == 0)
return;
- int width = (int) Math.ceil(Math.sqrt(tservers.size())) * spacing;
- int height = (int) Math.ceil(tservers.size() / width) * spacing;
- doSettings(sb, width < 640 ? 640 : width, height < 640 ? 640 : height);
- doScript(sb, tservers);
+ int width = (int) Math.ceil(Math.sqrt(tservers.size())) * cfg.spacing;
+ int height = (int) Math.ceil(tservers.size() / width) * cfg.spacing;
+ doSettings(sb, cfg, width < 640 ? 640 : width, height < 640 ? 640 : height);
+ doScript(sb, cfg, tservers);
}
- private void doSettings(StringBuilder sb, int width, int height) {
+ private void doSettings(StringBuilder sb, VisualizationConfig cfg, int width, int height) {
sb.append("<div class='left'>\n");
sb.append("<div id='parameters' class='nowrap'>\n");
// shape select box
sb.append("<span class='viscontrol'>Shape: <select id='shape' onchange='setShape(this)'><option>Circles</option><option")
- .append(!useCircles ? " selected='true'" : "").append(">Squares</option></select></span>\n");
+ .append(!cfg.useCircles ? " selected='true'" : "").append(">Squares</option></select></span>\n");
// size select box
- sb.append("  <span class='viscontrol'>Size: <select id='size' onchange='setSize(this)'><option").append(spacing == 10 ? " selected='true'" : "")
- .append(">10</option><option").append(spacing == 20 ? " selected='true'" : "").append(">20</option><option")
- .append(spacing == 40 ? " selected='true'" : "").append(">40</option><option").append(spacing == 80 ? " selected='true'" : "")
+ sb.append("  <span class='viscontrol'>Size: <select id='size' onchange='setSize(this)'><option").append(cfg.spacing == 10 ? " selected='true'" : "")
+ .append(">10</option><option").append(cfg.spacing == 20 ? " selected='true'" : "").append(">20</option><option")
+ .append(cfg.spacing == 40 ? " selected='true'" : "").append(">40</option><option").append(cfg.spacing == 80 ? " selected='true'" : "")
.append(">80</option></select></span>\n");
// motion select box
sb.append("  <span class='viscontrol'>Motion: <select id='motion' onchange='setMotion(this)'>");
- addOptions(sb, motion);
+ addOptions(sb, cfg.motion);
sb.append("</select></span>\n");
// color select box
sb.append("  <span class='viscontrol'>Color: <select id='color' onchange='setColor(this)'>");
- addOptions(sb, color);
+ addOptions(sb, cfg.color);
sb.append("</select></span>\n");
sb.append("  <span class='viscontrol'>(hover for info, click for details)</span>");
sb.append("</div>\n\n");
@@ -200,13 +200,13 @@ public class VisServlet extends BasicSer
}
}
- private void doScript(StringBuilder sb, ArrayList<TabletServerStatus> tservers) {
+ private void doScript(StringBuilder sb, VisualizationConfig cfg, ArrayList<TabletServerStatus> tservers) {
// initialization of some javascript variables
sb.append("<script type='text/javascript'>\n");
sb.append("var numCores = " + ManagementFactory.getOperatingSystemMXBean().getAvailableProcessors() + ";\n");
- sb.append("var jsonurl = '" + url + "json';\n");
- sb.append("var visurl = '" + url + "vis';\n");
- sb.append("var serverurl = '" + url + "tservers?s=';\n\n");
+ sb.append("var jsonurl = '" + cfg.url + "json';\n");
+ sb.append("var visurl = '" + cfg.url + "vis';\n");
+ sb.append("var serverurl = '" + cfg.url + "tservers?s=';\n\n");
sb.append("// observable stats that can be connected to motion or color\n");
sb.append("var statNames = {");
for (StatType st : StatType.values())
Modified: accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/problems/ProblemReport.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/problems/ProblemReport.java?rev=1357909&r1=1357908&r2=1357909&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/problems/ProblemReport.java (original)
+++ accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/problems/ProblemReport.java Thu Jul 5 21:03:16 2012
@@ -31,8 +31,8 @@ import org.apache.accumulo.core.data.Mut
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.util.Encoding;
import org.apache.accumulo.core.zookeeper.ZooUtil;
-import org.apache.accumulo.core.zookeeper.ZooUtil.NodeExistsPolicy;
-import org.apache.accumulo.core.zookeeper.ZooUtil.NodeMissingPolicy;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy;
import org.apache.accumulo.server.client.HdfsZooInstance;
import org.apache.accumulo.server.security.SecurityConstants;
import org.apache.accumulo.server.util.MetadataTable;
Modified: accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/problems/ProblemReports.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/problems/ProblemReports.java?rev=1357909&r1=1357908&r2=1357909&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/problems/ProblemReports.java (original)
+++ accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/problems/ProblemReports.java Thu Jul 5 21:03:16 2012
@@ -42,10 +42,10 @@ import org.apache.accumulo.core.iterator
import org.apache.accumulo.core.util.LoggingRunnable;
import org.apache.accumulo.core.util.NamingThreadFactory;
import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
import org.apache.accumulo.server.client.HdfsZooInstance;
import org.apache.accumulo.server.security.SecurityConstants;
import org.apache.accumulo.server.util.MetadataTable;
-import org.apache.accumulo.server.zookeeper.IZooReaderWriter;
import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
import org.apache.commons.collections.map.LRUMap;
import org.apache.hadoop.io.Text;
Modified: accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/security/ZKAuthenticator.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/security/ZKAuthenticator.java?rev=1357909&r1=1357908&r2=1357909&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/security/ZKAuthenticator.java (original)
+++ accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/security/ZKAuthenticator.java Thu Jul 5 21:03:16 2012
@@ -42,10 +42,10 @@ import org.apache.accumulo.core.security
import org.apache.accumulo.core.security.thrift.AuthInfo;
import org.apache.accumulo.core.security.thrift.SecurityErrorCode;
import org.apache.accumulo.core.util.ByteBufferUtil;
-import org.apache.accumulo.core.zookeeper.ZooUtil.NodeExistsPolicy;
-import org.apache.accumulo.core.zookeeper.ZooUtil.NodeMissingPolicy;
+import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy;
import org.apache.accumulo.server.client.HdfsZooInstance;
-import org.apache.accumulo.server.zookeeper.IZooReaderWriter;
import org.apache.accumulo.server.zookeeper.ZooCache;
import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
import org.apache.log4j.Logger;
Modified: accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java?rev=1357909&r1=1357908&r2=1357909&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java (original)
+++ accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java Thu Jul 5 21:03:16 2012
@@ -92,6 +92,7 @@ import org.apache.accumulo.core.util.Loc
import org.apache.accumulo.core.util.MetadataTable.DataFileValue;
import org.apache.accumulo.core.util.Pair;
import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
import org.apache.accumulo.server.ServerConstants;
import org.apache.accumulo.server.client.HdfsZooInstance;
import org.apache.accumulo.server.conf.ServerConfiguration;
@@ -120,7 +121,6 @@ import org.apache.accumulo.server.util.M
import org.apache.accumulo.server.util.MetadataTable;
import org.apache.accumulo.server.util.MetadataTable.LogEntry;
import org.apache.accumulo.server.util.TabletOperations;
-import org.apache.accumulo.server.zookeeper.IZooReaderWriter;
import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
import org.apache.commons.codec.DecoderException;
import org.apache.commons.codec.binary.Hex;
@@ -1165,8 +1165,7 @@ public class Tablet {
}
private static SortedMap<String,DataFileValue> lookupDatafiles(AccumuloConfiguration conf, Text locText, FileSystem fs, KeyExtent extent,
- SortedMap<Key,Value> tabletsKeyValues)
- throws IOException {
+ SortedMap<Key,Value> tabletsKeyValues) throws IOException {
Path location = new Path(ServerConstants.getTablesDir() + "/" + extent.getTableId().toString() + locText.toString());
TreeMap<String,DataFileValue> datafiles = new TreeMap<String,DataFileValue>();
@@ -1290,8 +1289,7 @@ public class Tablet {
private Tablet(TabletServer tabletServer, Text location, KeyExtent extent, TabletResourceManager trm, Configuration conf, FileSystem fs,
SortedMap<Key,Value> tabletsKeyValues) throws IOException {
this(tabletServer, location, extent, trm, conf, fs, lookupLogEntries(extent, tabletsKeyValues), lookupDatafiles(tabletServer.getSystemConfiguration(),
- location, fs, extent, tabletsKeyValues),
- lookupTime(tabletServer.getSystemConfiguration(), extent, tabletsKeyValues), lookupLastServer(extent,
+ location, fs, extent, tabletsKeyValues), lookupTime(tabletServer.getSystemConfiguration(), extent, tabletsKeyValues), lookupLastServer(extent,
tabletsKeyValues), lookupScanFiles(extent, tabletsKeyValues), lookupFlushID(extent, tabletsKeyValues), lookupCompactID(extent, tabletsKeyValues));
}
Modified: accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java?rev=1357909&r1=1357908&r2=1357909&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java (original)
+++ accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java Thu Jul 5 21:03:16 2012
@@ -53,6 +53,7 @@ import java.util.concurrent.Cancellation
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -126,11 +127,15 @@ import org.apache.accumulo.core.util.Dae
import org.apache.accumulo.core.util.LoggingRunnable;
import org.apache.accumulo.core.util.ServerServices;
import org.apache.accumulo.core.util.ServerServices.Service;
+import org.apache.accumulo.core.util.SimpleThreadPool;
import org.apache.accumulo.core.util.Stat;
import org.apache.accumulo.core.util.ThriftUtil;
import org.apache.accumulo.core.util.UtilWaitThread;
import org.apache.accumulo.core.zookeeper.ZooUtil;
-import org.apache.accumulo.core.zookeeper.ZooUtil.NodeExistsPolicy;
+import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
+import org.apache.accumulo.fate.zookeeper.ZooLock.LockLossReason;
+import org.apache.accumulo.fate.zookeeper.ZooLock.LockWatcher;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
import org.apache.accumulo.server.Accumulo;
import org.apache.accumulo.server.ServerConstants;
import org.apache.accumulo.server.client.ClientServiceHandler;
@@ -183,12 +188,10 @@ import org.apache.accumulo.server.util.T
import org.apache.accumulo.server.util.TServerUtils.ServerPort;
import org.apache.accumulo.server.util.time.RelativeTime;
import org.apache.accumulo.server.util.time.SimpleTimer;
-import org.apache.accumulo.server.zookeeper.IZooReaderWriter;
+import org.apache.accumulo.server.zookeeper.DistributedWorkQueue;
import org.apache.accumulo.server.zookeeper.TransactionWatcher;
import org.apache.accumulo.server.zookeeper.ZooCache;
import org.apache.accumulo.server.zookeeper.ZooLock;
-import org.apache.accumulo.server.zookeeper.ZooLock.LockLossReason;
-import org.apache.accumulo.server.zookeeper.ZooLock.LockWatcher;
import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
import org.apache.accumulo.start.Platform;
import org.apache.hadoop.fs.FSDataOutputStream;
@@ -2569,6 +2572,8 @@ public class TabletServer extends Abstra
private TServer server;
+ private DistributedWorkQueue bulkFailedCopyQ;
+
private static final String METRICS_PREFIX = "tserver";
private static ObjectName OBJECT_NAME = null;
@@ -2708,13 +2713,23 @@ public class TabletServer extends Abstra
}
clientAddress = new InetSocketAddress(clientAddress.getAddress(), clientPort);
announceExistence();
+
+ ThreadPoolExecutor distWorkQThreadPool = new SimpleThreadPool(getSystemConfiguration().getCount(Property.TSERV_WORKQ_THREADS), "distributed work queue");
+
+ bulkFailedCopyQ = new DistributedWorkQueue(ZooUtil.getRoot(instance) + Constants.ZBULK_FAILED_COPYQ);
+ try {
+ bulkFailedCopyQ.startProcessing(new BulkFailedCopyProcessor(), distWorkQThreadPool);
+ } catch (Exception e1) {
+ throw new RuntimeException("Failed to start distributed work queue for copying ", e1);
+ }
+
try {
- logSorter.startWatchingForRecoveryLogs(getClientAddressString());
+ logSorter.startWatchingForRecoveryLogs(distWorkQThreadPool);
} catch (Exception ex) {
log.error("Error setting watches for recoveries");
throw new RuntimeException(ex);
}
-
+
try {
OBJECT_NAME = new ObjectName("accumulo.server.metrics:service=TServerInfo,name=TabletServerMBean,instance=" + Thread.currentThread().getName());
// Do this because interface not in same package.
Modified: accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java?rev=1357909&r1=1357908&r2=1357909&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java (original)
+++ accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java Thu Jul 5 21:03:16 2012
@@ -25,8 +25,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
-import java.util.Random;
-import java.util.TimerTask;
import java.util.concurrent.ThreadPoolExecutor;
import org.apache.accumulo.core.Constants;
@@ -37,65 +35,84 @@ import org.apache.accumulo.core.master.t
import org.apache.accumulo.core.util.Pair;
import org.apache.accumulo.core.util.SimpleThreadPool;
import org.apache.accumulo.core.zookeeper.ZooUtil;
-import org.apache.accumulo.core.zookeeper.ZooUtil.NodeMissingPolicy;
import org.apache.accumulo.server.logger.LogFileKey;
import org.apache.accumulo.server.logger.LogFileValue;
-import org.apache.accumulo.server.util.time.SimpleTimer;
-import org.apache.accumulo.server.zookeeper.ZooLock;
-import org.apache.accumulo.server.zookeeper.ZooLock.LockLossReason;
-import org.apache.accumulo.server.zookeeper.ZooLock.LockWatcher;
-import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
+import org.apache.accumulo.server.zookeeper.DistributedWorkQueue;
+import org.apache.accumulo.server.zookeeper.DistributedWorkQueue.Processor;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.MapFile;
import org.apache.log4j.Logger;
import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
/**
*
*/
public class LogSorter {
+
private static final Logger log = Logger.getLogger(LogSorter.class);
FileSystem fs;
AccumuloConfiguration conf;
- private Map<String,Work> currentWork = new HashMap<String,Work>();
+ private Map<String,LogProcessor> currentWork = Collections.synchronizedMap(new HashMap<String,LogProcessor>());
- class Work implements Runnable {
- final String name;
- FSDataInputStream input;
- final String destPath;
- long bytesCopied = -1;
- long sortStart = 0;
- long sortStop = -1;
- private final LogSortNotifier cback;
+ class LogProcessor implements Processor {
- synchronized long getBytesCopied() throws IOException {
- return input == null ? bytesCopied : input.getPos();
+ private FSDataInputStream input;
+ private long bytesCopied = -1;
+ private long sortStart = 0;
+ private long sortStop = -1;
+
+ @Override
+ public Processor newProcessor() {
+ return new LogProcessor();
+ }
+
+ @Override
+ public void process(String child, byte[] data) {
+ String dest = Constants.getRecoveryDir(conf) + "/" + child;
+ String src = new String(data);
+ String name = new Path(src).getName();
+
+ synchronized (currentWork) {
+ if (currentWork.containsKey(name))
+ return;
+ currentWork.put(name, this);
+ }
+
+ try {
+ log.info("Copying " + src + " to " + dest);
+ sort(name, new Path(src), dest);
+ } finally {
+ currentWork.remove(name);
+ }
+
}
- Work(String name, FSDataInputStream input, String destPath, LogSortNotifier cback) {
- this.name = name;
- this.input = input;
- this.destPath = destPath;
- this.cback = cback;
- }
- synchronized boolean finished() {
- return input == null;
- }
- public void run() {
- sortStart = System.currentTimeMillis();
+ public void sort(String name, Path srcPath, String destPath) {
+
+ synchronized (this) {
+ sortStart = System.currentTimeMillis();
+ }
+
String formerThreadName = Thread.currentThread().getName();
int part = 0;
try {
+
+ // the following call does not throw an exception if the file/dir does not exist
+ fs.delete(new Path(destPath), true);
+
+ FSDataInputStream tmpInput = fs.open(srcPath);
+ synchronized (this) {
+ this.input = tmpInput;
+ }
+
final long bufferSize = conf.getMemoryInBytes(Property.TSERV_SORT_BUFFER_SIZE);
Thread.currentThread().setName("Sorting " + name + " for recovery");
while (true) {
- final ArrayList<Pair<LogFileKey, LogFileValue>> buffer = new ArrayList<Pair<LogFileKey, LogFileValue>>();
+ final ArrayList<Pair<LogFileKey,LogFileValue>> buffer = new ArrayList<Pair<LogFileKey,LogFileValue>>();
try {
long start = input.getPos();
while (input.getPos() - start < bufferSize) {
@@ -103,29 +120,26 @@ public class LogSorter {
LogFileValue value = new LogFileValue();
key.readFields(input);
value.readFields(input);
- buffer.add(new Pair<LogFileKey, LogFileValue>(key, value));
+ buffer.add(new Pair<LogFileKey,LogFileValue>(key, value));
}
- writeBuffer(buffer, part++);
+ writeBuffer(destPath, buffer, part++);
buffer.clear();
} catch (EOFException ex) {
- writeBuffer(buffer, part++);
+ writeBuffer(destPath, buffer, part++);
break;
}
}
fs.create(new Path(destPath, "finished")).close();
- log.debug("Log copy/sort of " + name + " complete");
+ log.info("Finished log sort " + name + " " + getBytesCopied() + " bytes " + part + " parts in " + getSortTime() + "ms");
} catch (Throwable t) {
try {
+ // parent dir may not exist
+ fs.mkdirs(new Path(destPath));
fs.create(new Path(destPath, "failed")).close();
} catch (IOException e) {
log.error("Error creating failed flag file " + name, e);
}
log.error(t, t);
- try {
- cback.notice(name, getBytesCopied(), part, getSortTime(), t.toString());
- } catch (Exception ex) {
- log.error("Strange error notifying the master of a logSort problem for file " + name);
- }
} finally {
Thread.currentThread().setName(formerThreadName);
try {
@@ -133,19 +147,13 @@ public class LogSorter {
} catch (IOException e) {
log.error("Error during cleanup sort/copy " + name, e);
}
- sortStop = System.currentTimeMillis();
- synchronized (currentWork) {
- currentWork.remove(name);
- }
- try {
- cback.notice(name, getBytesCopied(), part, getSortTime(), "");
- } catch (Exception ex) {
- log.error("Strange error reporting successful log sort " + name, ex);
+ synchronized (this) {
+ sortStop = System.currentTimeMillis();
}
}
}
- private void writeBuffer(ArrayList<Pair<LogFileKey,LogFileValue>> buffer, int part) throws IOException {
+ private void writeBuffer(String destPath, ArrayList<Pair<LogFileKey,LogFileValue>> buffer, int part) throws IOException {
String path = destPath + String.format("/part-r-%05d", part++);
MapFile.Writer output = new MapFile.Writer(fs.getConf(), fs, path, LogFileKey.class, LogFileValue.class);
try {
@@ -162,7 +170,7 @@ public class LogSorter {
output.close();
}
}
-
+
synchronized void close() throws IOException {
bytesCopied = input.getPos();
input.close();
@@ -177,9 +185,13 @@ public class LogSorter {
}
return 0;
}
- };
+
+ synchronized long getBytesCopied() throws IOException {
+ return input == null ? bytesCopied : input.getPos();
+ }
+ }
- final ThreadPoolExecutor threadPool;
+ ThreadPoolExecutor threadPool;
private Instance instance;
public LogSorter(Instance instance, FileSystem fs, AccumuloConfiguration conf) {
@@ -189,132 +201,16 @@ public class LogSorter {
int threadPoolSize = conf.getCount(Property.TSERV_RECOVERY_MAX_CONCURRENT);
this.threadPool = new SimpleThreadPool(threadPoolSize, this.getClass().getName());
}
-
- public void startWatchingForRecoveryLogs(final String serverName) throws KeeperException, InterruptedException {
- final String path = ZooUtil.getRoot(instance) + Constants.ZRECOVERY;
- final ZooReaderWriter zoo = ZooReaderWriter.getInstance();
- zoo.mkdirs(path);
- List<String> children = zoo.getChildren(path, new Watcher() {
- @Override
- public void process(WatchedEvent event) {
- switch (event.getType()) {
- case NodeChildrenChanged:
- if (event.getPath().equals(path))
- try {
- attemptRecoveries(zoo, serverName, path, zoo.getChildren(path, this));
- } catch (KeeperException e) {
- log.error("Unable to get recovery information", e);
- } catch (InterruptedException e) {
- log.info("Interrupted getting recovery information", e);
- }
- else
- log.info("Unexpected path for NodeChildrenChanged event " + event.getPath());
- break;
- case NodeCreated:
- case NodeDataChanged:
- case NodeDeleted:
- case None:
- log.info("Got unexpected zookeeper event: " + event.getType() + " for " + path);
- break;
-
- }
- }
- });
- attemptRecoveries(zoo, serverName, path, children);
- Random r = new Random();
- // Add a little jitter to avoid all the tservers slamming zookeeper at once
- SimpleTimer.getInstance().schedule(new TimerTask() {
- @Override
- public void run() {
- try {
- attemptRecoveries(zoo, serverName, path, zoo.getChildren(path));
- } catch (KeeperException e) {
- log.error("Unable to get recovery information", e);
- } catch (InterruptedException e) {
- log.info("Interrupted getting recovery information", e);
- }
- }
- }, r.nextInt(1000), 60 * 1000);
- }
-
- private void attemptRecoveries(final ZooReaderWriter zoo, final String serverName, final String path, List<String> children) {
- if (children.size() == 0)
- return;
-
- if (threadPool.getQueue().size() > 1)
- return;
- log.debug("Zookeeper references " + children.size() + " recoveries, attempting locks");
- Random random = new Random();
- Collections.shuffle(children, random);
- try {
- for (String child : children) {
- final String childPath = path + "/" + child;
- log.debug("Attempting to lock " + child);
- ZooLock lock = new ZooLock(childPath);
- if (lock.tryLock(new LockWatcher() {
- @Override
- public void lostLock(LockLossReason reason) {
- log.info("Ignoring lost lock event, reason " + reason);
- }
- }, serverName.getBytes())) {
- // Great... we got the lock, but maybe we're too busy
- if (threadPool.getQueue().size() > 1) {
- lock.unlock();
- log.debug("got the lock, but thread pool is busy; released the lock on " + child);
- break;
- }
- log.debug("got lock for " + child);
- byte[] contents = zoo.getData(childPath, null);
- String destination = Constants.getRecoveryDir(conf) + "/" + child;
- startSort(new String(contents), destination, new LogSortNotifier() {
- @Override
- public void notice(String name, long bytes, int parts, long milliseconds, String error) {
- log.info("Finished log sort " + name + " " + bytes + " bytes " + parts + " parts in " + milliseconds + "ms");
- try {
- zoo.recursiveDelete(childPath, NodeMissingPolicy.SKIP);
- } catch (Exception e) {
- log.error("Error received when trying to delete recovery entry in zookeeper " + childPath);
- }
- try {
- attemptRecoveries(zoo, serverName, path, zoo.getChildren(path));
- } catch (KeeperException e) {
- log.error("Unable to get recovery information", e);
- } catch (InterruptedException e) {
- log.info("Interrupted getting recovery information", e);
- }
- }
- });
- } else {
- log.debug("failed to get the lock " + child);
- }
- }
- } catch (Throwable t) {
- log.error("Unexpected error", t);
- }
- }
-
- public interface LogSortNotifier {
- public void notice(String name, long bytes, int parts, long milliseconds, String error);
- }
-
- private void startSort(String src, String dest, LogSortNotifier cback) throws IOException {
- log.info("Copying " + src + " to " + dest);
- fs.delete(new Path(dest), true);
- Path srcPath = new Path(src);
- synchronized (currentWork) {
- Work work = new Work(srcPath.getName(), fs.open(srcPath), dest, cback);
- if (!currentWork.containsKey(srcPath.getName())) {
- threadPool.execute(work);
- currentWork.put(srcPath.getName(), work);
- }
- }
+ public void startWatchingForRecoveryLogs(ThreadPoolExecutor distWorkQThreadPool) throws KeeperException, InterruptedException {
+ this.threadPool = distWorkQThreadPool;
+ new DistributedWorkQueue(ZooUtil.getRoot(instance) + Constants.ZRECOVERY).startProcessing(new LogProcessor(), this.threadPool);
}
public List<RecoveryStatus> getLogSorts() {
List<RecoveryStatus> result = new ArrayList<RecoveryStatus>();
synchronized (currentWork) {
- for (Entry<String,Work> entries : currentWork.entrySet()) {
+ for (Entry<String,LogProcessor> entries : currentWork.entrySet()) {
RecoveryStatus status = new RecoveryStatus();
status.name = entries.getKey();
try {
Modified: accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/test/TestIngest.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/test/TestIngest.java?rev=1357909&r1=1357908&r2=1357909&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/test/TestIngest.java (original)
+++ accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/test/TestIngest.java Thu Jul 5 21:03:16 2012
@@ -45,7 +45,7 @@ import org.apache.accumulo.core.security
import org.apache.accumulo.core.security.thrift.AuthInfo;
import org.apache.accumulo.core.trace.DistributedTrace;
import org.apache.accumulo.core.util.CachedConfiguration;
-import org.apache.accumulo.core.zookeeper.ZooReader;
+import org.apache.accumulo.fate.zookeeper.ZooReader;
import org.apache.accumulo.server.client.HdfsZooInstance;
import org.apache.accumulo.server.security.Authenticator;
import org.apache.accumulo.server.security.ZKAuthenticator;
@@ -258,7 +258,7 @@ public class TestIngest {
try {
if (ingestArgs.trace) {
String name = TestIngest.class.getSimpleName();
- DistributedTrace.enable(instance, new ZooReader(instance), name, null);
+ DistributedTrace.enable(instance, new ZooReader(instance.getZooKeepers(), instance.getZooKeepersSessionTimeOut()), name, null);
Trace.on(name);
Trace.currentTrace().data("cmdLine", Arrays.asList(args).toString());
}
Modified: accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/test/VerifyIngest.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/test/VerifyIngest.java?rev=1357909&r1=1357908&r2=1357909&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/test/VerifyIngest.java (original)
+++ accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/test/VerifyIngest.java Thu Jul 5 21:03:16 2012
@@ -33,7 +33,7 @@ import org.apache.accumulo.core.data.Val
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.trace.DistributedTrace;
import org.apache.accumulo.core.util.UtilWaitThread;
-import org.apache.accumulo.core.zookeeper.ZooReader;
+import org.apache.accumulo.fate.zookeeper.ZooReader;
import org.apache.accumulo.server.client.HdfsZooInstance;
import org.apache.accumulo.server.test.TestIngest.IngestArgs;
import org.apache.hadoop.io.Text;
@@ -60,7 +60,7 @@ public class VerifyIngest {
try {
if (ingestArgs.trace) {
String name = VerifyIngest.class.getSimpleName();
- DistributedTrace.enable(instance, new ZooReader(instance), name, null);
+ DistributedTrace.enable(instance, new ZooReader(instance.getZooKeepers(), instance.getZooKeepersSessionTimeOut()), name, null);
Trace.on(name);
Trace.currentTrace().data("cmdLine", Arrays.asList(args).toString());
}
Modified: accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/test/continuous/UndefinedAnalyzer.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/test/continuous/UndefinedAnalyzer.java?rev=1357909&r1=1357908&r2=1357909&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/test/continuous/UndefinedAnalyzer.java (original)
+++ accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/test/continuous/UndefinedAnalyzer.java Thu Jul 5 21:03:16 2012
@@ -80,46 +80,46 @@ public class UndefinedAnalyzer {
private void parseLog(File log) throws Exception {
BufferedReader reader = new BufferedReader(new FileReader(log));
-
String line;
TreeMap<Long,Long> tm = null;
-
- while ((line = reader.readLine()) != null) {
- if (!line.startsWith("UUID"))
- continue;
- String[] tokens = line.split("\\s");
- String time = tokens[1];
- String uuid = tokens[2];
-
- if (flushes.containsKey(uuid)) {
- System.err.println("WARN Duplicate uuid " + log);
+ try {
+ while ((line = reader.readLine()) != null) {
+ if (!line.startsWith("UUID"))
+ continue;
+ String[] tokens = line.split("\\s");
+ String time = tokens[1];
+ String uuid = tokens[2];
+
+ if (flushes.containsKey(uuid)) {
+ System.err.println("WARN Duplicate uuid " + log);
+ return;
+ }
+
+ tm = new TreeMap<Long,Long>(Collections.reverseOrder());
+ tm.put(0l, Long.parseLong(time));
+ flushes.put(uuid, tm);
+ break;
+
+ }
+ if (tm == null) {
+ System.err.println("WARN Bad ingest log " + log);
return;
}
- tm = new TreeMap<Long,Long>(Collections.reverseOrder());
- tm.put(0l, Long.parseLong(time));
- flushes.put(uuid, tm);
- break;
-
- }
-
- if (tm == null) {
- System.err.println("WARN Bad ingest log " + log);
- return;
- }
-
- while ((line = reader.readLine()) != null) {
- String[] tokens = line.split("\\s");
-
- if (!tokens[0].equals("FLUSH"))
- continue;
-
- String time = tokens[1];
- String count = tokens[4];
-
- tm.put(Long.parseLong(count), Long.parseLong(time));
+ while ((line = reader.readLine()) != null) {
+ String[] tokens = line.split("\\s");
+
+ if (!tokens[0].equals("FLUSH"))
+ continue;
+
+ String time = tokens[1];
+ String count = tokens[4];
+
+ tm.put(Long.parseLong(count), Long.parseLong(time));
+ }
+ } finally {
+ reader.close();
}
-
}
Iterator<Long> getTimes(String uuid, long count) {
@@ -172,45 +172,49 @@ public class UndefinedAnalyzer {
BufferedReader reader = new BufferedReader(new FileReader(masterLog));
String line;
- while ((line = reader.readLine()) != null) {
- if (line.contains("TABLET_LOADED")) {
- String[] tokens = line.split("\\s+");
- String tablet = tokens[8];
- String server = tokens[10];
-
- int pos1 = -1;
- int pos2 = -1;
- int pos3 = -1;
-
- for (int i = 0; i < tablet.length(); i++) {
- if (tablet.charAt(i) == '<' || tablet.charAt(i) == ';') {
- if (pos1 == -1) {
- pos1 = i;
- } else if (pos2 == -1) {
- pos2 = i;
- } else {
- pos3 = i;
+ try {
+ while ((line = reader.readLine()) != null) {
+ if (line.contains("TABLET_LOADED")) {
+ String[] tokens = line.split("\\s+");
+ String tablet = tokens[8];
+ String server = tokens[10];
+
+ int pos1 = -1;
+ int pos2 = -1;
+ int pos3 = -1;
+
+ for (int i = 0; i < tablet.length(); i++) {
+ if (tablet.charAt(i) == '<' || tablet.charAt(i) == ';') {
+ if (pos1 == -1) {
+ pos1 = i;
+ } else if (pos2 == -1) {
+ pos2 = i;
+ } else {
+ pos3 = i;
+ }
}
}
- }
-
- if (pos1 > 0 && pos2 > 0 && pos3 == -1) {
- String tid = tablet.substring(0, pos1);
- String endRow = tablet.charAt(pos1) == '<' ? "8000000000000000" : tablet.substring(pos1 + 1, pos2);
- String prevEndRow = tablet.charAt(pos2) == '<' ? "" : tablet.substring(pos2 + 1);
- if (tid.equals(tableId)) {
- // System.out.println(" "+server+" "+tid+" "+endRow+" "+prevEndRow);
- Date date = sdf.parse(tokens[0] + " " + tokens[1] + " " + currentYear + " " + currentMonth);
- // System.out.println(" "+date);
-
- assignments.add(new TabletAssignment(tablet, endRow, prevEndRow, server, date.getTime()));
-
+
+ if (pos1 > 0 && pos2 > 0 && pos3 == -1) {
+ String tid = tablet.substring(0, pos1);
+ String endRow = tablet.charAt(pos1) == '<' ? "8000000000000000" : tablet.substring(pos1 + 1, pos2);
+ String prevEndRow = tablet.charAt(pos2) == '<' ? "" : tablet.substring(pos2 + 1);
+ if (tid.equals(tableId)) {
+ // System.out.println(" "+server+" "+tid+" "+endRow+" "+prevEndRow);
+ Date date = sdf.parse(tokens[0] + " " + tokens[1] + " " + currentYear + " " + currentMonth);
+ // System.out.println(" "+date);
+
+ assignments.add(new TabletAssignment(tablet, endRow, prevEndRow, server, date.getTime()));
+
+ }
+ } else if (!tablet.startsWith("!0")) {
+ System.err.println("Cannot parse tablet " + tablet);
}
- } else if (!tablet.startsWith("!0")) {
- System.err.println("Cannot parse tablet " + tablet);
+
}
-
}
+ } finally {
+ reader.close();
}
}
}
Modified: accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/test/functional/CacheTestClean.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/test/functional/CacheTestClean.java?rev=1357909&r1=1357908&r2=1357909&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/test/functional/CacheTestClean.java (original)
+++ accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/test/functional/CacheTestClean.java Thu Jul 5 21:03:16 2012
@@ -18,8 +18,8 @@ package org.apache.accumulo.server.test.
import java.io.File;
-import org.apache.accumulo.core.zookeeper.ZooUtil.NodeMissingPolicy;
-import org.apache.accumulo.server.zookeeper.IZooReaderWriter;
+import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy;
import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
public class CacheTestClean {
Modified: accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/test/functional/CacheTestReader.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/test/functional/CacheTestReader.java?rev=1357909&r1=1357908&r2=1357909&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/test/functional/CacheTestReader.java (original)
+++ accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/test/functional/CacheTestReader.java Thu Jul 5 21:03:16 2012
@@ -25,7 +25,7 @@ import java.util.TreeMap;
import java.util.UUID;
import org.apache.accumulo.core.util.UtilWaitThread;
-import org.apache.accumulo.core.zookeeper.ZooCache;
+import org.apache.accumulo.fate.zookeeper.ZooCache;
public class CacheTestReader {
public static void main(String[] args) throws Exception {
@@ -70,6 +70,7 @@ public class CacheTestReader {
oos.writeObject(readData);
+ fos.close();
oos.close();
UtilWaitThread.sleep(20);
Modified: accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/test/functional/CacheTestWriter.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/test/functional/CacheTestWriter.java?rev=1357909&r1=1357908&r2=1357909&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/test/functional/CacheTestWriter.java (original)
+++ accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/test/functional/CacheTestWriter.java Thu Jul 5 21:03:16 2012
@@ -27,9 +27,9 @@ import java.util.TreeMap;
import java.util.UUID;
import org.apache.accumulo.core.util.UtilWaitThread;
-import org.apache.accumulo.core.zookeeper.ZooUtil.NodeExistsPolicy;
-import org.apache.accumulo.core.zookeeper.ZooUtil.NodeMissingPolicy;
-import org.apache.accumulo.server.zookeeper.IZooReaderWriter;
+import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy;
import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
public class CacheTestWriter {
@@ -132,6 +132,7 @@ public class CacheTestWriter {
@SuppressWarnings("unchecked")
Map<String,String> readerMap = (Map<String,String>) ois.readObject();
+ fis.close();
ois.close();
System.out.println("read " + readerMap);
Modified: accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/test/functional/SplitRecoveryTest.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/test/functional/SplitRecoveryTest.java?rev=1357909&r1=1357908&r2=1357909&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/test/functional/SplitRecoveryTest.java (original)
+++ accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/test/functional/SplitRecoveryTest.java Thu Jul 5 21:03:16 2012
@@ -38,7 +38,10 @@ import org.apache.accumulo.core.file.rfi
import org.apache.accumulo.core.util.ColumnFQ;
import org.apache.accumulo.core.util.MetadataTable.DataFileValue;
import org.apache.accumulo.core.zookeeper.ZooUtil;
-import org.apache.accumulo.core.zookeeper.ZooUtil.NodeExistsPolicy;
+import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
+import org.apache.accumulo.fate.zookeeper.ZooLock.LockLossReason;
+import org.apache.accumulo.fate.zookeeper.ZooLock.LockWatcher;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
import org.apache.accumulo.server.client.HdfsZooInstance;
import org.apache.accumulo.server.master.state.Assignment;
import org.apache.accumulo.server.master.state.TServerInstance;
@@ -46,10 +49,7 @@ import org.apache.accumulo.server.securi
import org.apache.accumulo.server.tabletserver.TabletServer;
import org.apache.accumulo.server.tabletserver.TabletTime;
import org.apache.accumulo.server.util.MetadataTable;
-import org.apache.accumulo.server.zookeeper.IZooReaderWriter;
import org.apache.accumulo.server.zookeeper.ZooLock;
-import org.apache.accumulo.server.zookeeper.ZooLock.LockLossReason;
-import org.apache.accumulo.server.zookeeper.ZooLock.LockWatcher;
import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
import org.apache.hadoop.io.Text;
Modified: accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/test/functional/ZombieTServer.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/test/functional/ZombieTServer.java?rev=1357909&r1=1357908&r2=1357909&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/test/functional/ZombieTServer.java (original)
+++ accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/test/functional/ZombieTServer.java Thu Jul 5 21:03:16 2012
@@ -34,14 +34,14 @@ import org.apache.accumulo.core.util.Ser
import org.apache.accumulo.core.util.ServerServices.Service;
import org.apache.accumulo.core.util.UtilWaitThread;
import org.apache.accumulo.core.zookeeper.ZooUtil;
-import org.apache.accumulo.core.zookeeper.ZooUtil.NodeExistsPolicy;
+import org.apache.accumulo.fate.zookeeper.ZooLock.LockLossReason;
+import org.apache.accumulo.fate.zookeeper.ZooLock.LockWatcher;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
import org.apache.accumulo.server.client.HdfsZooInstance;
import org.apache.accumulo.server.util.TServerUtils;
import org.apache.accumulo.server.util.TServerUtils.ServerPort;
import org.apache.accumulo.server.zookeeper.TransactionWatcher;
import org.apache.accumulo.server.zookeeper.ZooLock;
-import org.apache.accumulo.server.zookeeper.ZooLock.LockLossReason;
-import org.apache.accumulo.server.zookeeper.ZooLock.LockWatcher;
import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
import org.apache.log4j.Logger;
import org.apache.thrift.TException;
Modified: accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/test/randomwalk/Framework.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/test/randomwalk/Framework.java?rev=1357909&r1=1357908&r2=1357909&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/test/randomwalk/Framework.java (original)
+++ accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/test/randomwalk/Framework.java Thu Jul 5 21:03:16 2012
@@ -103,7 +103,9 @@ public class Framework {
String module = args[3];
Properties props = new Properties();
- props.load(new FileInputStream(configDir + "/randomwalk.conf"));
+ FileInputStream fis = new FileInputStream(configDir + "/randomwalk.conf");
+ props.load(fis);
+ fis.close();
System.setProperty("localLog", localLogPath + "/" + logId);
System.setProperty("nfsLog", props.getProperty("NFS_LOGPATH") + "/" + logId);
Modified: accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/test/randomwalk/State.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/test/randomwalk/State.java?rev=1357909&r1=1357908&r2=1357909&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/test/randomwalk/State.java (original)
+++ accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/test/randomwalk/State.java Thu Jul 5 21:03:16 2012
@@ -139,7 +139,8 @@ public class State {
files = libdir.list();
for (int i = 0; i < files.length; i++) {
String f = files[i];
- if (f.matches("^accumulo-core-.+jar$") || f.matches("^accumulo-server-.+jar$") || f.matches("^cloudtrace-.+jar$") || f.matches("^libthrift-.+jar$")) {
+ if (f.matches("^accumulo-core-.+jar$") || f.matches("^accumulo-server-.+jar$") || f.matches("^accumulo-fate-.+jar$") || f.matches("^cloudtrace-.+jar$")
+ || f.matches("^libthrift-.+jar$")) {
if (retval == null) {
retval = String.format("%s/%s", libdir.getAbsolutePath(), f);
} else {
Modified: accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/test/randomwalk/security/SecurityHelper.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/test/randomwalk/security/SecurityHelper.java?rev=1357909&r1=1357908&r2=1357909&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/test/randomwalk/security/SecurityHelper.java (original)
+++ accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/test/randomwalk/security/SecurityHelper.java Thu Jul 5 21:03:16 2012
@@ -31,10 +31,6 @@ import org.apache.accumulo.server.test.r
import org.apache.hadoop.fs.FileSystem;
import org.apache.log4j.Logger;
-/**
- * @author jwvines
- *
- */
public class SecurityHelper {
protected final static Logger log = Logger.getLogger(SecurityHelper.class);
Modified: accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/test/scalability/Run.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/test/scalability/Run.java?rev=1357909&r1=1357908&r2=1357909&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/test/scalability/Run.java (original)
+++ accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/test/scalability/Run.java Thu Jul 5 21:03:16 2012
@@ -52,8 +52,11 @@ public class Run {
Properties scaleProps = new Properties();
Properties testProps = new Properties();
try {
- scaleProps.load(new FileInputStream(sitePath));
- testProps.load(new FileInputStream(testPath));
+ FileInputStream fis = new FileInputStream(sitePath);
+ scaleProps.load(fis);
+ fis.close();
+ fis = new FileInputStream(testPath);
+ testProps.load(fis);
} catch (Exception e) {
System.out.println("Problem loading config file");
e.printStackTrace();
Modified: accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/trace/TraceServer.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/trace/TraceServer.java?rev=1357909&r1=1357908&r2=1357909&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/trace/TraceServer.java (original)
+++ accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/trace/TraceServer.java Thu Jul 5 21:03:16 2012
@@ -39,12 +39,12 @@ import org.apache.accumulo.core.util.Add
import org.apache.accumulo.core.util.CachedConfiguration;
import org.apache.accumulo.core.util.UtilWaitThread;
import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
import org.apache.accumulo.server.Accumulo;
import org.apache.accumulo.server.client.HdfsZooInstance;
import org.apache.accumulo.server.conf.ServerConfiguration;
import org.apache.accumulo.server.security.SecurityUtil;
import org.apache.accumulo.server.util.time.SimpleTimer;
-import org.apache.accumulo.server.zookeeper.IZooReaderWriter;
import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.Text;