You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ct...@apache.org on 2013/11/01 01:55:57 UTC

[18/54] [partial] ACCUMULO-658, ACCUMULO-656 Split server into separate modules

http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/src/main/java/org/apache/accumulo/server/gc/SimpleGarbageCollector.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/gc/SimpleGarbageCollector.java b/server/src/main/java/org/apache/accumulo/server/gc/SimpleGarbageCollector.java
deleted file mode 100644
index 6dd9d2c..0000000
--- a/server/src/main/java/org/apache/accumulo/server/gc/SimpleGarbageCollector.java
+++ /dev/null
@@ -1,578 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.gc;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.SortedMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.accumulo.core.Constants;
-import org.apache.accumulo.core.client.AccumuloException;
-import org.apache.accumulo.core.client.AccumuloSecurityException;
-import org.apache.accumulo.core.client.BatchWriter;
-import org.apache.accumulo.core.client.BatchWriterConfig;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.Instance;
-import org.apache.accumulo.core.client.IsolatedScanner;
-import org.apache.accumulo.core.client.MutationsRejectedException;
-import org.apache.accumulo.core.client.Scanner;
-import org.apache.accumulo.core.client.TableNotFoundException;
-import org.apache.accumulo.core.client.impl.Tables;
-import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.data.PartialKey;
-import org.apache.accumulo.core.data.Range;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.gc.thrift.GCMonitorService.Iface;
-import org.apache.accumulo.core.gc.thrift.GCMonitorService.Processor;
-import org.apache.accumulo.core.gc.thrift.GCStatus;
-import org.apache.accumulo.core.gc.thrift.GcCycleStats;
-import org.apache.accumulo.core.master.state.tables.TableState;
-import org.apache.accumulo.core.metadata.MetadataTable;
-import org.apache.accumulo.core.metadata.RootTable;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ScanFileColumnFamily;
-import org.apache.accumulo.core.security.Authorizations;
-import org.apache.accumulo.core.security.Credentials;
-import org.apache.accumulo.core.security.SecurityUtil;
-import org.apache.accumulo.core.security.thrift.TCredentials;
-import org.apache.accumulo.core.util.NamingThreadFactory;
-import org.apache.accumulo.core.util.ServerServices;
-import org.apache.accumulo.core.util.ServerServices.Service;
-import org.apache.accumulo.core.util.UtilWaitThread;
-import org.apache.accumulo.core.zookeeper.ZooUtil;
-import org.apache.accumulo.fate.zookeeper.ZooLock.LockLossReason;
-import org.apache.accumulo.fate.zookeeper.ZooLock.LockWatcher;
-import org.apache.accumulo.server.Accumulo;
-import org.apache.accumulo.server.ServerConstants;
-import org.apache.accumulo.server.ServerOpts;
-import org.apache.accumulo.server.client.HdfsZooInstance;
-import org.apache.accumulo.server.conf.ServerConfiguration;
-import org.apache.accumulo.server.fs.VolumeManager;
-import org.apache.accumulo.server.fs.VolumeManager.FileType;
-import org.apache.accumulo.server.fs.VolumeManagerImpl;
-import org.apache.accumulo.server.master.state.tables.TableManager;
-import org.apache.accumulo.server.security.SystemCredentials;
-import org.apache.accumulo.server.util.Halt;
-import org.apache.accumulo.server.util.TServerUtils;
-import org.apache.accumulo.server.util.TabletIterator;
-import org.apache.accumulo.server.zookeeper.ZooLock;
-import org.apache.accumulo.trace.instrument.CountSampler;
-import org.apache.accumulo.trace.instrument.Sampler;
-import org.apache.accumulo.trace.instrument.Span;
-import org.apache.accumulo.trace.instrument.Trace;
-import org.apache.accumulo.trace.instrument.thrift.TraceWrap;
-import org.apache.accumulo.trace.thrift.TInfo;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
-import org.apache.log4j.Logger;
-import org.apache.zookeeper.KeeperException;
-
-import com.beust.jcommander.Parameter;
-import com.google.common.base.Function;
-import com.google.common.collect.Iterators;
-import com.google.common.net.HostAndPort;
-
-public class SimpleGarbageCollector implements Iface {
-  private static final Text EMPTY_TEXT = new Text();
-  
-  static class Opts extends ServerOpts {
-    @Parameter(names = {"-v", "--verbose"}, description = "extra information will get printed to stdout also")
-    boolean verbose = false;
-    @Parameter(names = {"-s", "--safemode"}, description = "safe mode will not delete files")
-    boolean safeMode = false;
-  }
-  
-  // how much of the JVM's available memory should it use gathering candidates
-  private static final float CANDIDATE_MEMORY_PERCENTAGE = 0.75f;
-
-  private static final Logger log = Logger.getLogger(SimpleGarbageCollector.class);
-  
-  private Credentials credentials;
-  private long gcStartDelay;
-  private VolumeManager fs;
-  private boolean useTrash = true;
-  private Opts opts = new Opts();
-  private ZooLock lock;
-  
-  private GCStatus status = new GCStatus(new GcCycleStats(), new GcCycleStats(), new GcCycleStats(), new GcCycleStats());
-  
-  private int numDeleteThreads;
-  
-  private Instance instance;
-  
-  public static void main(String[] args) throws UnknownHostException, IOException {
-    SecurityUtil.serverLogin();
-    Instance instance = HdfsZooInstance.getInstance();
-    ServerConfiguration serverConf = new ServerConfiguration(instance);
-    final VolumeManager fs = VolumeManagerImpl.get();
-    Accumulo.init(fs, serverConf, "gc");
-    Opts opts = new Opts();
-    opts.parseArgs("gc", args);
-    SimpleGarbageCollector gc = new SimpleGarbageCollector(opts);
-    
-    gc.init(fs, instance, SystemCredentials.get(), serverConf.getConfiguration().getBoolean(Property.GC_TRASH_IGNORE));
-    Accumulo.enableTracing(opts.getAddress(), "gc");
-    gc.run();
-  }
-  
-  public SimpleGarbageCollector(Opts opts) {
-    this.opts = opts;
-  }
-  
-  public void init(VolumeManager fs, Instance instance, Credentials credentials, boolean noTrash) throws IOException {
-    this.fs = fs;
-    this.credentials = credentials;
-    this.instance = instance;
-    
-    gcStartDelay = instance.getConfiguration().getTimeInMillis(Property.GC_CYCLE_START);
-    long gcDelay = instance.getConfiguration().getTimeInMillis(Property.GC_CYCLE_DELAY);
-    numDeleteThreads = instance.getConfiguration().getCount(Property.GC_DELETE_THREADS);
-    log.info("start delay: " + gcStartDelay + " milliseconds");
-    log.info("time delay: " + gcDelay + " milliseconds");
-    log.info("safemode: " + opts.safeMode);
-    log.info("verbose: " + opts.verbose);
-    log.info("memory threshold: " + CANDIDATE_MEMORY_PERCENTAGE + " of " + Runtime.getRuntime().maxMemory() + " bytes");
-    log.info("delete threads: " + numDeleteThreads);
-    useTrash = !noTrash;
-  }
-  
-  private class GCEnv implements GarbageCollectionEnvironment {
-
-    private String tableName;
-
-    GCEnv(String tableName) {
-      this.tableName = tableName;
-    }
-
-    @Override
-    public List<String> getCandidates(String continuePoint) throws TableNotFoundException, AccumuloException, AccumuloSecurityException {
-      // want to ensure GC makes progress... if the 1st N deletes are stable and we keep processing them,
-      // then will never inspect deletes after N
-      Range range = MetadataSchema.DeletesSection.getRange();
-      if (continuePoint != null && !continuePoint.isEmpty()) {
-        String continueRow = MetadataSchema.DeletesSection.getRowPrefix() + continuePoint;
-        range = new Range(new Key(continueRow).followingKey(PartialKey.ROW), true, range.getEndKey(), range.isEndKeyInclusive());
-      }
-
-      Scanner scanner = instance.getConnector(credentials.getPrincipal(), credentials.getToken()).createScanner(tableName, Authorizations.EMPTY);
-      scanner.setRange(range);
-      List<String> result = new ArrayList<String>();
-      // find candidates for deletion; chop off the prefix
-      for (Entry<Key,Value> entry : scanner) {
-        String cand = entry.getKey().getRow().toString().substring(MetadataSchema.DeletesSection.getRowPrefix().length());
-        result.add(cand);
-        if (almostOutOfMemory()) {
-          log.info("List of delete candidates has exceeded the memory threshold. Attempting to delete what has been gathered so far.");
-          break;
-        }
-      }
-
-      return result;
-
-    }
-
-    @Override
-    public Iterator<String> getBlipIterator() throws TableNotFoundException, AccumuloException, AccumuloSecurityException {
-      IsolatedScanner scanner = new IsolatedScanner(instance.getConnector(credentials.getPrincipal(), credentials.getToken()).createScanner(tableName,
-          Authorizations.EMPTY));
-
-      scanner.setRange(MetadataSchema.BlipSection.getRange());
-
-      return Iterators.transform(scanner.iterator(), new Function<Entry<Key,Value>,String>() {
-        @Override
-        public String apply(Entry<Key,Value> entry) {
-          return entry.getKey().getRow().toString().substring(MetadataSchema.BlipSection.getRowPrefix().length());
-        }
-      });
-    }
-
-    @Override
-    public Iterator<Entry<Key,Value>> getReferenceIterator() throws TableNotFoundException, AccumuloException, AccumuloSecurityException {
-      IsolatedScanner scanner = new IsolatedScanner(instance.getConnector(credentials.getPrincipal(), credentials.getToken()).createScanner(tableName,
-          Authorizations.EMPTY));
-      scanner.fetchColumnFamily(DataFileColumnFamily.NAME);
-      scanner.fetchColumnFamily(ScanFileColumnFamily.NAME);
-      TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.fetch(scanner);
-      TabletIterator tabletIterator = new TabletIterator(scanner, MetadataSchema.TabletsSection.getRange(), false, true);
-
-      return Iterators.concat(Iterators.transform(tabletIterator, new Function<Map<Key,Value>,Iterator<Entry<Key,Value>>>() {
-        @Override
-        public Iterator<Entry<Key,Value>> apply(Map<Key,Value> input) {
-          return input.entrySet().iterator();
-        }
-      }));
-    }
-
-    @Override
-    public Set<String> getTableIDs() {
-      return Tables.getIdToNameMap(instance).keySet();
-    }
-
-    @Override
-    public void delete(SortedMap<String,String> confirmedDeletes) throws IOException, AccumuloException, AccumuloSecurityException, TableNotFoundException {
-
-      if (opts.safeMode) {
-        if (opts.verbose)
-          System.out.println("SAFEMODE: There are " + confirmedDeletes.size() + " data file candidates marked for deletion.%n"
-              + "          Examine the log files to identify them.%n");
-        log.info("SAFEMODE: Listing all data file candidates for deletion");
-        for (String s : confirmedDeletes.values())
-          log.info("SAFEMODE: " + s);
-        log.info("SAFEMODE: End candidates for deletion");
-        return;
-      }
-
-      Connector c = instance.getConnector(SystemCredentials.get().getPrincipal(), SystemCredentials.get().getToken());
-      BatchWriter writer = c.createBatchWriter(tableName, new BatchWriterConfig());
-
-      // when deleting a dir and all files in that dir, only need to delete the dir
-      // the dir will sort right before the files... so remove the files in this case
-      // to minimize namenode ops
-      Iterator<Entry<String,String>> cdIter = confirmedDeletes.entrySet().iterator();
-
-      String lastDir = null;
-      while (cdIter.hasNext()) {
-        Entry<String,String> entry = cdIter.next();
-        String relPath = entry.getKey();
-        String absPath = fs.getFullPath(FileType.TABLE, entry.getValue()).toString();
-
-        if (isDir(relPath)) {
-          lastDir = absPath;
-        } else if (lastDir != null) {
-          if (absPath.startsWith(lastDir)) {
-            log.debug("Ignoring " + entry.getValue() + " because " + lastDir + " exist");
-            try {
-              putMarkerDeleteMutation(entry.getValue(), writer);
-            } catch (MutationsRejectedException e) {
-              throw new RuntimeException(e);
-            }
-            cdIter.remove();
-          } else {
-            lastDir = null;
-          }
-        }
-      }
-
-      final BatchWriter finalWriter = writer;
-
-      ExecutorService deleteThreadPool = Executors.newFixedThreadPool(numDeleteThreads, new NamingThreadFactory("deleting"));
-
-      for (final String delete : confirmedDeletes.values()) {
-
-        Runnable deleteTask = new Runnable() {
-          @Override
-          public void run() {
-            boolean removeFlag;
-
-            try {
-              Path fullPath = fs.getFullPath(FileType.TABLE, delete);
-
-              log.debug("Deleting " + fullPath);
-
-              if (moveToTrash(fullPath) || fs.deleteRecursively(fullPath)) {
-                // delete succeeded, still want to delete
-                removeFlag = true;
-                synchronized (SimpleGarbageCollector.this) {
-                  ++status.current.deleted;
-                }
-              } else if (fs.exists(fullPath)) {
-                // leave the entry in the METADATA table; we'll try again
-                // later
-                removeFlag = false;
-                synchronized (SimpleGarbageCollector.this) {
-                  ++status.current.errors;
-                }
-                log.warn("File exists, but was not deleted for an unknown reason: " + fullPath);
-              } else {
-                // this failure, we still want to remove the METADATA table
-                // entry
-                removeFlag = true;
-                synchronized (SimpleGarbageCollector.this) {
-                  ++status.current.errors;
-                }
-                String parts[] = delete.split("/");
-                if (parts.length > 2) {
-                  String tableId = parts[parts.length - 3];
-                  String tabletDir = parts[parts.length - 2];
-                  TableManager.getInstance().updateTableStateCache(tableId);
-                  TableState tableState = TableManager.getInstance().getTableState(tableId);
-                  if (tableState != null && tableState != TableState.DELETING) {
-                    // clone directories don't always exist
-                    if (!tabletDir.startsWith("c-"))
-                      log.warn("File doesn't exist: " + fullPath);
-                  }
-                } else {
-                  log.warn("Very strange path name: " + delete);
-                }
-              }
-
-              // proceed to clearing out the flags for successful deletes and
-              // non-existent files
-              if (removeFlag && finalWriter != null) {
-                putMarkerDeleteMutation(delete, finalWriter);
-              }
-            } catch (Exception e) {
-              log.error(e, e);
-            }
-
-          }
-
-        };
-
-        deleteThreadPool.execute(deleteTask);
-      }
-
-      deleteThreadPool.shutdown();
-
-      try {
-        while (!deleteThreadPool.awaitTermination(1000, TimeUnit.MILLISECONDS)) {}
-      } catch (InterruptedException e1) {
-        log.error(e1, e1);
-      }
-
-      if (writer != null) {
-        try {
-          writer.close();
-        } catch (MutationsRejectedException e) {
-          log.error("Problem removing entries from the metadata table: ", e);
-        }
-      }
-    }
-
-    @Override
-    public void deleteTableDirIfEmpty(String tableID) throws IOException {
-      // if dir exist and is empty, then empty list is returned...
-      // hadoop 1.0 will return null if the file doesn't exist
-      // hadoop 2.0 will throw an exception if the file does not exist
-      for (String dir : ServerConstants.getTablesDirs()) {
-        FileStatus[] tabletDirs = null;
-        try {
-          tabletDirs = fs.listStatus(new Path(dir + "/" + tableID));
-        } catch (FileNotFoundException ex) {
-          // ignored
-        }
-        if (tabletDirs == null)
-          continue;
-
-        if (tabletDirs.length == 0) {
-          Path p = new Path(dir + "/" + tableID);
-          log.debug("Removing table dir " + p);
-          if (!moveToTrash(p))
-            fs.delete(p);
-        }
-      }
-    }
-
-    @Override
-    public void incrementCandidatesStat(long i) {
-      status.current.candidates += i;
-    }
-
-    @Override
-    public void incrementInUseStat(long i) {
-      status.current.inUse += i;
-    }
-
-  }
-
-  private void run() {
-    long tStart, tStop;
-    
-    // Sleep for an initial period, giving the master time to start up and
-    // old data files to be unused
-      
-    try {
-      getZooLock(startStatsService());
-    } catch (Exception ex) {
-      log.error(ex, ex);
-      System.exit(1);
-    }
-
-    try {
-      log.debug("Sleeping for " + gcStartDelay + " milliseconds before beginning garbage collection cycles");
-      Thread.sleep(gcStartDelay);
-    } catch (InterruptedException e) {
-      log.warn(e, e);
-      return;
-    }
-    
-    Sampler sampler = new CountSampler(100);
-    
-    while (true) {
-      if (sampler.next())
-        Trace.on("gc");
-      
-      Span gcSpan = Trace.start("loop");
-      tStart = System.currentTimeMillis();
-      try {
-        System.gc(); // make room
-
-        status.current.started = System.currentTimeMillis();
-
-        new GarbageCollectionAlgorithm().collect(new GCEnv(RootTable.NAME));
-        new GarbageCollectionAlgorithm().collect(new GCEnv(MetadataTable.NAME));
-
-        log.info("Number of data file candidates for deletion: " + status.current.candidates);
-        log.info("Number of data file candidates still in use: " + status.current.inUse);
-        log.info("Number of successfully deleted data files: " + status.current.deleted);
-        log.info("Number of data files delete failures: " + status.current.errors);
-
-        status.current.finished = System.currentTimeMillis();
-        status.last = status.current;
-        status.current = new GcCycleStats();
-        
-      } catch (Exception e) {
-        log.error(e, e);
-      }
-
-      tStop = System.currentTimeMillis();
-      log.info(String.format("Collect cycle took %.2f seconds", ((tStop - tStart) / 1000.0)));
-      
-      // Clean up any unused write-ahead logs
-      Span waLogs = Trace.start("walogs");
-      try {
-        GarbageCollectWriteAheadLogs walogCollector = new GarbageCollectWriteAheadLogs(instance, fs, useTrash);
-        log.info("Beginning garbage collection of write-ahead logs");
-        walogCollector.collect(status);
-      } catch (Exception e) {
-        log.error(e, e);
-      } finally {
-        waLogs.stop();
-      }
-      gcSpan.stop();
-      
-      // we just made a lot of changes to the !METADATA table: flush them out
-      try {
-        Connector connector = instance.getConnector(credentials.getPrincipal(), credentials.getToken());
-        connector.tableOperations().compact(MetadataTable.NAME, null, null, true, true);
-        connector.tableOperations().compact(RootTable.NAME, null, null, true, true);
-      } catch (Exception e) {
-        log.warn(e, e);
-      }
-      
-      Trace.offNoFlush();
-      try {
-        long gcDelay = instance.getConfiguration().getTimeInMillis(Property.GC_CYCLE_DELAY);
-        log.debug("Sleeping for " + gcDelay + " milliseconds");
-        Thread.sleep(gcDelay);
-      } catch (InterruptedException e) {
-        log.warn(e, e);
-        return;
-      }
-    }
-  }
-  
-  private boolean moveToTrash(Path path) throws IOException {
-    if (!useTrash)
-      return false;
-    try {
-      return fs.moveToTrash(path);
-    } catch (FileNotFoundException ex) {
-      return false;
-    }
-  }
-  
-  private void getZooLock(HostAndPort addr) throws KeeperException, InterruptedException {
-    String path = ZooUtil.getRoot(HdfsZooInstance.getInstance()) + Constants.ZGC_LOCK;
-    
-    LockWatcher lockWatcher = new LockWatcher() {
-      @Override
-      public void lostLock(LockLossReason reason) {
-        Halt.halt("GC lock in zookeeper lost (reason = " + reason + "), exiting!");
-      }
-      
-      @Override
-      public void unableToMonitorLockNode(final Throwable e) {
-        Halt.halt(-1, new Runnable() {
-          
-          @Override
-          public void run() {
-            log.fatal("No longer able to monitor lock node ", e);
-          }
-        });
-        
-      }
-    };
-    
-    while (true) {
-      lock = new ZooLock(path);
-      if (lock.tryLock(lockWatcher, new ServerServices(addr.toString(), Service.GC_CLIENT).toString().getBytes())) {
-        break;
-      }
-      UtilWaitThread.sleep(1000);
-    }
-  }
-  
-  private HostAndPort startStatsService() throws UnknownHostException {
-    Processor<Iface> processor = new Processor<Iface>(TraceWrap.service(this));
-    int port = instance.getConfiguration().getPort(Property.GC_PORT);
-    long maxMessageSize = instance.getConfiguration().getMemoryInBytes(Property.GENERAL_MAX_MESSAGE_SIZE);
-    HostAndPort result = HostAndPort.fromParts(opts.getAddress(), port);
-    try {
-      port = TServerUtils.startTServer(result, processor, this.getClass().getSimpleName(), "GC Monitor Service", 2, 1000, maxMessageSize).address.getPort();
-    } catch (Exception ex) {
-      log.fatal(ex, ex);
-      throw new RuntimeException(ex);
-    }
-    return result;
-  }
-  
-
-  static public boolean almostOutOfMemory() {
-    Runtime runtime = Runtime.getRuntime();
-    return runtime.totalMemory() - runtime.freeMemory() > CANDIDATE_MEMORY_PERCENTAGE * runtime.maxMemory();
-  }
-  
-  
-  final static String METADATA_TABLE_DIR = "/" + MetadataTable.ID;
-  
-  private static void putMarkerDeleteMutation(final String delete, final BatchWriter writer)
-      throws MutationsRejectedException {
-    Mutation m = new Mutation(MetadataSchema.DeletesSection.getRowPrefix() + delete);
-    m.putDelete(EMPTY_TEXT, EMPTY_TEXT);
-    writer.addMutation(m);
-  }
-  
-  
-  private boolean isDir(String delete) {
-    int slashCount = 0;
-    for (int i = 0; i < delete.length(); i++)
-      if (delete.charAt(i) == '/')
-        slashCount++;
-    return slashCount == 1;
-  }
-  
-  @Override
-  public GCStatus getStatus(TInfo info, TCredentials credentials) {
-    return status;
-  }
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/src/main/java/org/apache/accumulo/server/iterators/MetadataBulkLoadFilter.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/iterators/MetadataBulkLoadFilter.java b/server/src/main/java/org/apache/accumulo/server/iterators/MetadataBulkLoadFilter.java
deleted file mode 100644
index d8bcebe..0000000
--- a/server/src/main/java/org/apache/accumulo/server/iterators/MetadataBulkLoadFilter.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.iterators;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.accumulo.core.Constants;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.iterators.Filter;
-import org.apache.accumulo.core.iterators.IteratorEnvironment;
-import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
-import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
-import org.apache.accumulo.fate.zookeeper.TransactionWatcher.Arbitrator;
-import org.apache.accumulo.server.zookeeper.TransactionWatcher.ZooArbitrator;
-import org.apache.log4j.Logger;
-
-/**
- * A special iterator for the metadata table that removes inactive bulk load flags
- * 
- */
-public class MetadataBulkLoadFilter extends Filter {
-  private static Logger log = Logger.getLogger(MetadataBulkLoadFilter.class);
-  
-  enum Status {
-    ACTIVE, INACTIVE
-  }
-  
-  Map<Long,Status> bulkTxStatusCache;
-  Arbitrator arbitrator;
-  
-  @Override
-  public boolean accept(Key k, Value v) {
-    if (!k.isDeleted() && k.compareColumnFamily(TabletsSection.BulkFileColumnFamily.NAME) == 0) {
-      long txid = Long.valueOf(v.toString());
-      
-      Status status = bulkTxStatusCache.get(txid);
-      if (status == null) {
-        try {
-          if (arbitrator.transactionComplete(Constants.BULK_ARBITRATOR_TYPE, txid)) {
-            status = Status.INACTIVE;
-          } else {
-            status = Status.ACTIVE;
-          }
-        } catch (Exception e) {
-          status = Status.ACTIVE;
-          log.error(e, e);
-        }
-        
-        bulkTxStatusCache.put(txid, status);
-      }
-      
-      return status == Status.ACTIVE;
-    }
-    
-    return true;
-  }
-  
-  @Override
-  public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException {
-    super.init(source, options, env);
-    
-    if (env.getIteratorScope() == IteratorScope.scan) {
-      throw new IOException("This iterator not intended for use at scan time");
-    }
-    
-    bulkTxStatusCache = new HashMap<Long,MetadataBulkLoadFilter.Status>();
-    arbitrator = getArbitrator();
-  }
-  
-  protected Arbitrator getArbitrator() {
-    return new ZooArbitrator();
-  }
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/src/main/java/org/apache/accumulo/server/logger/LogEvents.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/logger/LogEvents.java b/server/src/main/java/org/apache/accumulo/server/logger/LogEvents.java
deleted file mode 100644
index 8cbe033..0000000
--- a/server/src/main/java/org/apache/accumulo/server/logger/LogEvents.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.logger;
-
-public enum LogEvents {
-  // DO NOT CHANGE ORDER OF ENUMS, ORDER IS USED IN SERIALIZATION
-  OPEN,
-  DEFINE_TABLET,
-  MUTATION,
-  MANY_MUTATIONS,
-  COMPACTION_START,
-  COMPACTION_FINISH;
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/src/main/java/org/apache/accumulo/server/logger/LogFileKey.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/logger/LogFileKey.java b/server/src/main/java/org/apache/accumulo/server/logger/LogFileKey.java
deleted file mode 100644
index c69d409..0000000
--- a/server/src/main/java/org/apache/accumulo/server/logger/LogFileKey.java
+++ /dev/null
@@ -1,190 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.logger;
-
-import static org.apache.accumulo.server.logger.LogEvents.DEFINE_TABLET;
-import static org.apache.accumulo.server.logger.LogEvents.MANY_MUTATIONS;
-import static org.apache.accumulo.server.logger.LogEvents.MUTATION;
-import static org.apache.accumulo.server.logger.LogEvents.OPEN;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.accumulo.core.data.KeyExtent;
-import org.apache.hadoop.io.WritableComparable;
-
-public class LogFileKey implements WritableComparable<LogFileKey> {
-  
-  public LogEvents event;
-  public String filename = null;
-  public KeyExtent tablet = null;
-  public long seq = -1;
-  public int tid = -1;
-  public static final int VERSION = 2;
-  public String tserverSession;
-  
-  @Override
-  public void readFields(DataInput in) throws IOException {
-    int value = in.readByte();
-    event = LogEvents.values()[value];
-    switch (event) {
-      case OPEN:
-        tid = in.readInt();
-        tserverSession = in.readUTF();
-        if (tid != VERSION) {
-          throw new RuntimeException(String.format("Bad version number for log file: expected %d, but saw %d", VERSION, tid));
-        }
-        break;
-      case COMPACTION_FINISH:
-        seq = in.readLong();
-        tid = in.readInt();
-        break;
-      case COMPACTION_START:
-        seq = in.readLong();
-        tid = in.readInt();
-        filename = in.readUTF();
-        break;
-      case DEFINE_TABLET:
-        seq = in.readLong();
-        tid = in.readInt();
-        tablet = new KeyExtent();
-        tablet.readFields(in);
-        break;
-      case MANY_MUTATIONS:
-        seq = in.readLong();
-        tid = in.readInt();
-        break;
-      case MUTATION:
-        seq = in.readLong();
-        tid = in.readInt();
-        break;
-      default:
-        throw new RuntimeException("Unknown log event type: " + event);
-    }
-    
-  }
-  
-  @Override
-  public void write(DataOutput out) throws IOException {
-    out.writeByte(event.ordinal());
-    switch (event) {
-      case OPEN:
-        seq = -1;
-        tid = -1;
-        out.writeInt(VERSION);
-        out.writeUTF(tserverSession);
-        // out.writeUTF(Accumulo.getInstanceID());
-        break;
-      case COMPACTION_FINISH:
-        out.writeLong(seq);
-        out.writeInt(tid);
-        break;
-      case COMPACTION_START:
-        out.writeLong(seq);
-        out.writeInt(tid);
-        out.writeUTF(filename);
-        break;
-      case DEFINE_TABLET:
-        out.writeLong(seq);
-        out.writeInt(tid);
-        tablet.write(out);
-        break;
-      case MANY_MUTATIONS:
-        out.writeLong(seq);
-        out.writeInt(tid);
-        break;
-      case MUTATION:
-        out.writeLong(seq);
-        out.writeInt(tid);
-        break;
-      default:
-        throw new IllegalArgumentException("Bad value for LogFileEntry type");
-    }
-  }
-  
-  static int eventType(LogEvents event) {
-    // Order logs by START, TABLET_DEFINITIONS, COMPACTIONS and then MUTATIONS
-    if (event == MUTATION || event == MANY_MUTATIONS) {
-      return 3;
-    }
-    if (event == DEFINE_TABLET) {
-      return 1;
-    }
-    if (event == OPEN) {
-      return 0;
-    }
-    return 2;
-  }
-  
-  private static int sign(long l) {
-    if (l < 0)
-      return -1;
-    if (l > 0)
-      return 1;
-    return 0;
-  }
-  
-  @Override
-  public int compareTo(LogFileKey o) {
-    if (eventType(this.event) != eventType(o.event)) {
-      return eventType(this.event) - eventType(o.event);
-    }
-    if (this.event == OPEN)
-      return 0;
-    if (this.tid != o.tid) {
-      return this.tid - o.tid;
-    }
-    return sign(this.seq - o.seq);
-  }
-  
-  @Override
-  public boolean equals(Object obj) {
-    if (obj instanceof LogFileKey) {
-      return this.compareTo((LogFileKey) obj) == 0;
-    }
-    return false;
-  }
-  
-  @Override
-  public int hashCode() {
-    return (int) seq;
-  }
-  
-  public static void printEntry(LogFileKey entry) {
-    System.out.println(entry.toString());
-  }
-  
-  @Override
-  public String toString() {
-    switch (event) {
-      case OPEN:
-        return String.format("OPEN %s", tserverSession);
-      case COMPACTION_FINISH:
-        return String.format("COMPACTION_FINISH %d %d", tid, seq);
-      case COMPACTION_START:
-        return String.format("COMPACTION_START %d %d %s", tid, seq, filename);
-      case MUTATION:
-        return String.format("MUTATION %d %d", tid, seq);
-      case MANY_MUTATIONS:
-        return String.format("MANY_MUTATIONS %d %d", tid, seq);
-      case DEFINE_TABLET:
-        return String.format("DEFINE_TABLET %d %d %s", tid, seq, tablet);
-    }
-    throw new RuntimeException("Unknown type of entry: " + event);
-  }
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/src/main/java/org/apache/accumulo/server/logger/LogFileValue.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/logger/LogFileValue.java b/server/src/main/java/org/apache/accumulo/server/logger/LogFileValue.java
deleted file mode 100644
index 6019b18..0000000
--- a/server/src/main/java/org/apache/accumulo/server/logger/LogFileValue.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.logger;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.accumulo.core.data.ColumnUpdate;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.server.data.ServerMutation;
-import org.apache.hadoop.io.Writable;
-
-public class LogFileValue implements Writable {
-  
-  private static final List<Mutation> empty = Collections.emptyList();
-  
-  public List<Mutation> mutations = empty;
-  
-  @Override
-  public void readFields(DataInput in) throws IOException {
-    int count = in.readInt();
-    mutations = new ArrayList<Mutation>(count);
-    for (int i = 0; i < count; i++) {
-      ServerMutation mutation = new ServerMutation();
-      mutation.readFields(in);
-      mutations.add(mutation);
-    }
-  }
-  
-  @Override
-  public void write(DataOutput out) throws IOException {
-    out.writeInt(mutations.size());
-    for (Mutation m : mutations) {
-      m.write(out);
-    }
-  }
-  
-  public static void print(LogFileValue value) {
-    System.out.println(value.toString());
-  }
-  
-  private static String displayLabels(byte[] labels) {
-    String s = new String(labels);
-    s = s.replace("&", " & ");
-    s = s.replace("|", " | ");
-    return s;
-  }
-  
-  public static String format(LogFileValue lfv, int maxMutations) {
-    if (lfv.mutations.size() == 0)
-      return "";
-    StringBuilder builder = new StringBuilder();
-    builder.append(lfv.mutations.size() + " mutations:\n");
-    int i = 0;
-    for (Mutation m : lfv.mutations) {
-      if (i++ >= maxMutations) {
-        builder.append("...");
-        break;
-      }
-      builder.append("  " + new String(m.getRow()) + "\n");
-      for (ColumnUpdate update : m.getUpdates()) {
-        String value = new String(update.getValue());
-        builder.append("      " + new String(update.getColumnFamily()) + ":" + new String(update.getColumnQualifier()) + " "
-            + (update.hasTimestamp() ? "[user]:" : "[system]:") + update.getTimestamp() + " [" + displayLabels(update.getColumnVisibility()) + "] "
-            + (update.isDeleted() ? "<deleted>" : value) + "\n");
-      }
-    }
-    return builder.toString();
-  }
-  
-  @Override
-  public String toString() {
-    return format(this, 5);
-  }
-  
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/src/main/java/org/apache/accumulo/server/logger/LogReader.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/logger/LogReader.java b/server/src/main/java/org/apache/accumulo/server/logger/LogReader.java
deleted file mode 100644
index 9368819..0000000
--- a/server/src/main/java/org/apache/accumulo/server/logger/LogReader.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.logger;
-
-import java.io.DataInputStream;
-import java.io.EOFException;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import org.apache.accumulo.core.cli.Help;
-import org.apache.accumulo.core.conf.SiteConfiguration;
-import org.apache.accumulo.core.data.KeyExtent;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.server.fs.VolumeManager;
-import org.apache.accumulo.server.fs.VolumeManagerImpl;
-import org.apache.accumulo.server.tabletserver.log.DfsLogger;
-import org.apache.accumulo.server.tabletserver.log.DfsLogger.DFSLoggerInputStreams;
-import org.apache.accumulo.server.tabletserver.log.MultiReader;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
-
-import com.beust.jcommander.JCommander;
-import com.beust.jcommander.Parameter;
-
-public class LogReader {
-  
-  static class Opts extends Help {
-    @Parameter(names = "-r", description = "print only mutations associated with the given row")
-    String row;
-    @Parameter(names = "-m", description = "limit the number of mutations printed per row")
-    int maxMutations = 5;
-    @Parameter(names = "-t", description = "print only mutations that fall within the given key extent")
-    String extent;
-    @Parameter(names = "-p", description = "search for a row that matches the given regex")
-    String regexp;
-    @Parameter(description = "<logfile> { <logfile> ... }")
-    List<String> files = new ArrayList<String>();
-  }
-  
-  /**
-   * Dump a Log File (Map or Sequence) to stdout. Will read from HDFS or local file system.
-   * 
-   * @param args
-   *          - first argument is the file to print
-   * @throws IOException
-   */
-  public static void main(String[] args) throws IOException {
-    Opts opts = new Opts();
-    opts.parseArgs(LogReader.class.getName(), args);
-    VolumeManager fs = VolumeManagerImpl.get();
-    
-    Matcher rowMatcher = null;
-    KeyExtent ke = null;
-    Text row = null;
-    if (opts.files.isEmpty()) {
-      new JCommander(opts).usage();
-      return;
-    }
-    if (opts.row != null)
-      row = new Text(opts.row);
-    if (opts.extent != null) {
-      String sa[] = opts.extent.split(";");
-      ke = new KeyExtent(new Text(sa[0]), new Text(sa[1]), new Text(sa[2]));
-    }
-    if (opts.regexp != null) {
-      Pattern pattern = Pattern.compile(opts.regexp);
-      rowMatcher = pattern.matcher("");
-    }
-    
-    Set<Integer> tabletIds = new HashSet<Integer>();
-    
-    for (String file : opts.files) {
-      
-      Path path = new Path(file);
-      LogFileKey key = new LogFileKey();
-      LogFileValue value = new LogFileValue();
-      
-      if (fs.isFile(path)) {
-        // read log entries from a simple hdfs file
-        @SuppressWarnings("deprecation")
-        DFSLoggerInputStreams streams = DfsLogger.readHeaderAndReturnStream(fs, path, SiteConfiguration.getSiteConfiguration());        
-        DataInputStream input =  streams.getDecryptingInputStream();
-
-        try {
-          while (true) {
-            try {
-              key.readFields(input);
-              value.readFields(input);
-            } catch (EOFException ex) {
-              break;
-            }
-            printLogEvent(key, value, row, rowMatcher, ke, tabletIds, opts.maxMutations);
-          }
-        } finally {
-          input.close();
-        }
-      } else {
-        // read the log entries sorted in a map file
-        MultiReader input = new MultiReader(fs, path);
-        while (input.next(key, value)) {
-          printLogEvent(key, value, row, rowMatcher, ke, tabletIds, opts.maxMutations);
-        }
-      }
-    }
-  }
-  
-  public static void printLogEvent(LogFileKey key, LogFileValue value, Text row, Matcher rowMatcher, KeyExtent ke, Set<Integer> tabletIds, int maxMutations) {
-    
-    if (ke != null) {
-      if (key.event == LogEvents.DEFINE_TABLET) {
-        if (key.tablet.equals(ke)) {
-          tabletIds.add(key.tid);
-        } else {
-          return;
-        }
-      } else if (!tabletIds.contains(key.tid)) {
-        return;
-      }
-    }
-    
-    if (row != null || rowMatcher != null) {
-      if (key.event == LogEvents.MUTATION || key.event == LogEvents.MANY_MUTATIONS) {
-        boolean found = false;
-        for (Mutation m : value.mutations) {
-          if (row != null && new Text(m.getRow()).equals(row)) {
-            found = true;
-            break;
-          }
-          
-          if (rowMatcher != null) {
-            rowMatcher.reset(new String(m.getRow()));
-            if (rowMatcher.matches()) {
-              found = true;
-              break;
-            }
-          }
-        }
-        
-        if (!found)
-          return;
-      } else {
-        return;
-      }
-      
-    }
-    
-    System.out.println(key);
-    System.out.println(LogFileValue.format(value, maxMutations));
-  }
-  
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/src/main/java/org/apache/accumulo/server/master/EventCoordinator.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/master/EventCoordinator.java b/server/src/main/java/org/apache/accumulo/server/master/EventCoordinator.java
deleted file mode 100644
index 7449bc3..0000000
--- a/server/src/main/java/org/apache/accumulo/server/master/EventCoordinator.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.master;
-
-import org.apache.log4j.Logger;
-
-public class EventCoordinator {
-  
-  private static final Logger log = Logger.getLogger(EventCoordinator.class);
-  long eventCounter = 0;
-  
-  synchronized long waitForEvents(long millis, long lastEvent) {
-    // Did something happen since the last time we waited?
-    if (lastEvent == eventCounter) {
-      // no
-      if (millis <= 0)
-        return eventCounter;
-      try {
-        wait(millis);
-      } catch (InterruptedException e) {
-        log.debug("ignoring InterruptedException", e);
-      }
-    }
-    return eventCounter;
-  }
-  
-  synchronized public void event(String msg, Object... args) {
-    log.info(String.format(msg, args));
-    eventCounter++;
-    notifyAll();
-  }
-  
-  public Listener getListener() {
-    return new Listener();
-  }
-  
-  public class Listener {
-    long lastEvent;
-    
-    Listener() {
-      lastEvent = eventCounter;
-    }
-    
-    public void waitForEvents(long millis) {
-      lastEvent = EventCoordinator.this.waitForEvents(millis, lastEvent);
-    }
-  }
-  
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java b/server/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java
deleted file mode 100644
index 3eef065..0000000
--- a/server/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java
+++ /dev/null
@@ -1,399 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.master;
-
-import static org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy.SKIP;
-
-import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-
-import org.apache.accumulo.core.Constants;
-import org.apache.accumulo.core.client.Instance;
-import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
-import org.apache.accumulo.core.conf.AccumuloConfiguration;
-import org.apache.accumulo.core.data.KeyExtent;
-import org.apache.accumulo.core.master.thrift.TabletServerStatus;
-import org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException;
-import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
-import org.apache.accumulo.core.util.AddressUtil;
-import org.apache.accumulo.core.util.ServerServices;
-import org.apache.accumulo.core.util.ThriftUtil;
-import org.apache.accumulo.core.zookeeper.ZooUtil;
-import org.apache.accumulo.server.master.state.TServerInstance;
-import org.apache.accumulo.server.security.SystemCredentials;
-import org.apache.accumulo.server.util.Halt;
-import org.apache.accumulo.server.util.time.SimpleTimer;
-import org.apache.accumulo.server.zookeeper.ZooCache;
-import org.apache.accumulo.server.zookeeper.ZooLock;
-import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
-import org.apache.accumulo.trace.instrument.Tracer;
-import org.apache.hadoop.io.Text;
-import org.apache.log4j.Logger;
-import org.apache.thrift.TException;
-import org.apache.thrift.transport.TTransport;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.KeeperException.NoNodeException;
-import org.apache.zookeeper.KeeperException.NotEmptyException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.data.Stat;
-
-import com.google.common.net.HostAndPort;
-
-public class LiveTServerSet implements Watcher {
-  
-  public interface Listener {
-    void update(LiveTServerSet current, Set<TServerInstance> deleted, Set<TServerInstance> added);
-  }
-  
-  private static final Logger log = Logger.getLogger(LiveTServerSet.class);
-  
-  private final Listener cback;
-  private final Instance instance;
-  private final AccumuloConfiguration conf;
-  private ZooCache zooCache;
-  
-  public class TServerConnection {
-    private final HostAndPort address;
-    
-    public TServerConnection(HostAndPort addr) throws TException {
-      address = addr;
-    }
-    
-    private String lockString(ZooLock mlock) {
-      return mlock.getLockID().serialize(ZooUtil.getRoot(instance) + Constants.ZMASTER_LOCK);
-    }
-    
-    public void assignTablet(ZooLock lock, KeyExtent extent) throws TException {
-      TabletClientService.Client client = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, conf);
-      try {
-        client.loadTablet(Tracer.traceInfo(), SystemCredentials.get().toThrift(instance), lockString(lock), extent.toThrift());
-      } finally {
-        ThriftUtil.returnClient(client);
-      }
-    }
-    
-    public void unloadTablet(ZooLock lock, KeyExtent extent, boolean save) throws TException {
-      TabletClientService.Client client = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, conf);
-      try {
-        client.unloadTablet(Tracer.traceInfo(), SystemCredentials.get().toThrift(instance), lockString(lock), extent.toThrift(), save);
-      } finally {
-        ThriftUtil.returnClient(client);
-      }
-    }
-    
-    public TabletServerStatus getTableMap(boolean usePooledConnection) throws TException, ThriftSecurityException {
-      
-      if (usePooledConnection == true)
-        throw new UnsupportedOperationException();
-      
-      TTransport transport = ThriftUtil.createTransport(address, conf);
-      
-      try {
-        TabletClientService.Client client = ThriftUtil.createClient(new TabletClientService.Client.Factory(), transport);
-        return client.getTabletServerStatus(Tracer.traceInfo(), SystemCredentials.get().toThrift(instance));
-      } finally {
-        if (transport != null)
-          transport.close();
-      }
-    }
-    
-    public void halt(ZooLock lock) throws TException, ThriftSecurityException {
-      TabletClientService.Client client = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, conf);
-      try {
-        client.halt(Tracer.traceInfo(), SystemCredentials.get().toThrift(instance), lockString(lock));
-      } finally {
-        ThriftUtil.returnClient(client);
-      }
-    }
-    
-    public void fastHalt(ZooLock lock) throws TException {
-      TabletClientService.Client client = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, conf);
-      try {
-        client.fastHalt(Tracer.traceInfo(), SystemCredentials.get().toThrift(instance), lockString(lock));
-      } finally {
-        ThriftUtil.returnClient(client);
-      }
-    }
-    
-    public void flush(ZooLock lock, String tableId, byte[] startRow, byte[] endRow) throws TException {
-      TabletClientService.Client client = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, conf);
-      try {
-        client.flush(Tracer.traceInfo(), SystemCredentials.get().toThrift(instance), lockString(lock), tableId,
-            startRow == null ? null : ByteBuffer.wrap(startRow), endRow == null ? null : ByteBuffer.wrap(endRow));
-      } finally {
-        ThriftUtil.returnClient(client);
-      }
-    }
-    
-    public void chop(ZooLock lock, KeyExtent extent) throws TException {
-      TabletClientService.Client client = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, conf);
-      try {
-        client.chop(Tracer.traceInfo(), SystemCredentials.get().toThrift(instance), lockString(lock), extent.toThrift());
-      } finally {
-        ThriftUtil.returnClient(client);
-      }
-    }
-    
-    public void splitTablet(ZooLock lock, KeyExtent extent, Text splitPoint) throws TException, ThriftSecurityException, NotServingTabletException {
-      TabletClientService.Client client = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, conf);
-      try {
-        client.splitTablet(Tracer.traceInfo(), SystemCredentials.get().toThrift(instance), extent.toThrift(),
-            ByteBuffer.wrap(splitPoint.getBytes(), 0, splitPoint.getLength()));
-      } finally {
-        ThriftUtil.returnClient(client);
-      }
-    }
-    
-    public void flushTablet(ZooLock lock, KeyExtent extent) throws TException {
-      TabletClientService.Client client = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, conf);
-      try {
-        client.flushTablet(Tracer.traceInfo(), SystemCredentials.get().toThrift(instance), lockString(lock), extent.toThrift());
-      } finally {
-        ThriftUtil.returnClient(client);
-      }
-    }
-    
-    public void compact(ZooLock lock, String tableId, byte[] startRow, byte[] endRow) throws TException {
-      TabletClientService.Client client = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, conf);
-      try {
-        client.compact(Tracer.traceInfo(), SystemCredentials.get().toThrift(instance), lockString(lock), tableId,
-            startRow == null ? null : ByteBuffer.wrap(startRow), endRow == null ? null : ByteBuffer.wrap(endRow));
-      } finally {
-        ThriftUtil.returnClient(client);
-      }
-    }
-    
-    public boolean isActive(long tid) throws TException {
-      TabletClientService.Client client = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, conf);
-      try {
-        return client.isActive(Tracer.traceInfo(), tid);
-      } finally {
-        ThriftUtil.returnClient(client);
-      }
-    }
-    
-  }
-  
-  static class TServerInfo {
-    TServerConnection connection;
-    TServerInstance instance;
-    
-    TServerInfo(TServerInstance instance, TServerConnection connection) {
-      this.connection = connection;
-      this.instance = instance;
-    }
-  };
-  
-  // The set of active tservers with locks, indexed by their name in zookeeper
-  private Map<String,TServerInfo> current = new HashMap<String,TServerInfo>();
-  // as above, indexed by TServerInstance
-  private Map<TServerInstance,TServerInfo> currentInstances = new HashMap<TServerInstance,TServerInfo>();
-  
-  // The set of entries in zookeeper without locks, and the first time each was noticed
-  private Map<String,Long> locklessServers = new HashMap<String,Long>();
-  
-  public LiveTServerSet(Instance instance, AccumuloConfiguration conf, Listener cback) {
-    this.cback = cback;
-    this.instance = instance;
-    this.conf = conf;
-    
-  }
-  
-  public synchronized ZooCache getZooCache() {
-    if (zooCache == null)
-      zooCache = new ZooCache(this);
-    return zooCache;
-  }
-  
-  public synchronized void startListeningForTabletServerChanges() {
-    scanServers();
-    SimpleTimer.getInstance().schedule(new Runnable() {
-      @Override
-      public void run() {
-        scanServers();
-      }
-    }, 0, 5000);
-  }
-  
-  public synchronized void scanServers() {
-    try {
-      final Set<TServerInstance> updates = new HashSet<TServerInstance>();
-      final Set<TServerInstance> doomed = new HashSet<TServerInstance>();
-      
-      final String path = ZooUtil.getRoot(instance) + Constants.ZTSERVERS;
-      
-      HashSet<String> all = new HashSet<String>(current.keySet());
-      all.addAll(getZooCache().getChildren(path));
-      
-      locklessServers.keySet().retainAll(all);
-      
-      for (String zPath : all) {
-        checkServer(updates, doomed, path, zPath);
-      }
-      
-      // log.debug("Current: " + current.keySet());
-      if (!doomed.isEmpty() || !updates.isEmpty())
-        this.cback.update(this, doomed, updates);
-    } catch (Exception ex) {
-      log.error(ex, ex);
-    }
-  }
-  
-  private void deleteServerNode(String serverNode) throws InterruptedException, KeeperException {
-    try {
-      ZooReaderWriter.getInstance().delete(serverNode, -1);
-    } catch (NotEmptyException ex) {
-      // race condition: tserver created the lock after our last check; we'll see it at the next check
-    } catch (NoNodeException nne) {
-      // someone else deleted it
-    }
-  }
-  
-  private synchronized void checkServer(final Set<TServerInstance> updates, final Set<TServerInstance> doomed, final String path, final String zPath)
-      throws TException, InterruptedException, KeeperException {
-    
-    TServerInfo info = current.get(zPath);
-    
-    final String lockPath = path + "/" + zPath;
-    Stat stat = new Stat();
-    byte[] lockData = ZooLock.getLockData(getZooCache(), lockPath, stat);
-    
-    if (lockData == null) {
-      if (info != null) {
-        doomed.add(info.instance);
-        current.remove(zPath);
-        currentInstances.remove(info.instance);
-      }
-      
-      Long firstSeen = locklessServers.get(zPath);
-      if (firstSeen == null) {
-        locklessServers.put(zPath, System.currentTimeMillis());
-      } else if (System.currentTimeMillis() - firstSeen > 10 * 60 * 1000) {
-        deleteServerNode(path + "/" + zPath);
-        locklessServers.remove(zPath);
-      }
-    } else {
-      locklessServers.remove(zPath);
-      ServerServices services = new ServerServices(new String(lockData));
-      HostAndPort client = services.getAddress(ServerServices.Service.TSERV_CLIENT);
-      TServerInstance instance = new TServerInstance(client, stat.getEphemeralOwner());
-      
-      if (info == null) {
-        updates.add(instance);
-        TServerInfo tServerInfo = new TServerInfo(instance, new TServerConnection(client));
-        current.put(zPath, tServerInfo);
-        currentInstances.put(instance, tServerInfo);
-      } else if (!info.instance.equals(instance)) {
-        doomed.add(info.instance);
-        updates.add(instance);
-        TServerInfo tServerInfo = new TServerInfo(instance, new TServerConnection(client));
-        current.put(zPath, tServerInfo);
-        currentInstances.put(info.instance, tServerInfo);
-      }
-    }
-  }
-  
-  @Override
-  public void process(WatchedEvent event) {
-    
-    // its important that these event are propagated by ZooCache, because this ensures when reading zoocache that is has already processed the event and cleared
-    // relevant nodes before code below reads from zoocache
-    
-    if (event.getPath() != null) {
-      if (event.getPath().endsWith(Constants.ZTSERVERS)) {
-        scanServers();
-      } else if (event.getPath().contains(Constants.ZTSERVERS)) {
-        int pos = event.getPath().lastIndexOf('/');
-        
-        // do only if ZTSERVER is parent
-        if (pos >= 0 && event.getPath().substring(0, pos).endsWith(Constants.ZTSERVERS)) {
-          
-          String server = event.getPath().substring(pos + 1);
-          
-          final Set<TServerInstance> updates = new HashSet<TServerInstance>();
-          final Set<TServerInstance> doomed = new HashSet<TServerInstance>();
-          
-          final String path = ZooUtil.getRoot(instance) + Constants.ZTSERVERS;
-          
-          try {
-            checkServer(updates, doomed, path, server);
-            if (!doomed.isEmpty() || !updates.isEmpty())
-              this.cback.update(this, doomed, updates);
-          } catch (Exception ex) {
-            log.error(ex, ex);
-          }
-        }
-      }
-    }
-  }
-  
-  public synchronized TServerConnection getConnection(TServerInstance server) throws TException {
-    if (server == null)
-      return null;
-    TServerInfo tServerInfo = currentInstances.get(server);
-    if (tServerInfo == null)
-      return null;
-    return tServerInfo.connection;
-  }
-  
-  public synchronized Set<TServerInstance> getCurrentServers() {
-    return new HashSet<TServerInstance>(currentInstances.keySet());
-  }
-  
-  public synchronized int size() {
-    return current.size();
-  }
-  
-  public synchronized TServerInstance find(String tabletServer) {
-    HostAndPort addr = AddressUtil.parseAddress(tabletServer);
-    for (Entry<String,TServerInfo> entry : current.entrySet()) {
-      if (entry.getValue().instance.getLocation().equals(addr))
-        return entry.getValue().instance;
-    }
-    return null;
-  }
-  
-  public synchronized void remove(TServerInstance server) {
-    String zPath = null;
-    for (Entry<String,TServerInfo> entry : current.entrySet()) {
-      if (entry.getValue().instance.equals(server)) {
-        zPath = entry.getKey();
-        break;
-      }
-    }
-    if (zPath == null)
-      return;
-    current.remove(zPath);
-    currentInstances.remove(server);
-    
-    log.info("Removing zookeeper lock for " + server);
-    String fullpath = ZooUtil.getRoot(instance) + Constants.ZTSERVERS + "/" + zPath;
-    try {
-      ZooReaderWriter.getRetryingInstance().recursiveDelete(fullpath, SKIP);
-    } catch (Exception e) {
-      String msg = "error removing tablet server lock";
-      log.fatal(msg, e);
-      Halt.halt(msg, -1);
-    }
-    getZooCache().clear(fullpath);
-  }
-}