You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2014/06/05 06:43:16 UTC

[34/35] git commit: Merge remote-tracking branch 'origin/master' into ACCUMULO-378

Merge remote-tracking branch 'origin/master' into ACCUMULO-378

Conflicts:
	server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/ada6ce46
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/ada6ce46
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/ada6ce46

Branch: refs/heads/ACCUMULO-378
Commit: ada6ce464b1e9d818c06655369b30a45afa840c0
Parents: 98eb56f 47d5933
Author: Josh Elser <el...@apache.org>
Authored: Wed Jun 4 22:07:59 2014 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Wed Jun 4 22:07:59 2014 -0400

----------------------------------------------------------------------
 .../server/GarbageCollectionLogger.java         | 103 ++++
 .../apache/accumulo/tserver/TabletServer.java   | 607 ++++++-------------
 .../accumulo/tserver/TservConstraintEnv.java    |  83 +++
 .../apache/accumulo/tserver/log/DfsLogger.java  |  20 +-
 .../tserver/session/ConditionalSession.java     |  41 ++
 .../tserver/session/MultiScanSession.java       |  62 ++
 .../accumulo/tserver/session/ScanSession.java   |  69 +++
 .../accumulo/tserver/session/Session.java       |  43 ++
 .../accumulo/tserver/session/UpdateSession.java |  56 ++
 .../apache/accumulo/tserver/tablet/Tablet.java  |  12 +-
 .../apache/accumulo/test/AuditMessageIT.java    |   5 +
 11 files changed, 649 insertions(+), 452 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/ada6ce46/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
----------------------------------------------------------------------
diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index e6286ff,ee28c7f..dd3c16e
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@@ -78,11 -76,7 +76,8 @@@ import org.apache.accumulo.core.client.
  import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
  import org.apache.accumulo.core.conf.AccumuloConfiguration;
  import org.apache.accumulo.core.conf.Property;
- import org.apache.accumulo.core.constraints.Constraint.Environment;
- import org.apache.accumulo.core.constraints.Violations;
- import org.apache.accumulo.core.data.ByteSequence;
  import org.apache.accumulo.core.data.Column;
 +import org.apache.accumulo.core.data.ColumnUpdate;
  import org.apache.accumulo.core.data.ConstraintViolationSummary;
  import org.apache.accumulo.core.data.Key;
  import org.apache.accumulo.core.data.KeyExtent;
@@@ -116,8 -110,6 +111,7 @@@ import org.apache.accumulo.core.master.
  import org.apache.accumulo.core.metadata.MetadataTable;
  import org.apache.accumulo.core.metadata.RootTable;
  import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
 +import org.apache.accumulo.core.replication.thrift.ReplicationServicer;
- import org.apache.accumulo.core.security.AuthorizationContainer;
  import org.apache.accumulo.core.security.Authorizations;
  import org.apache.accumulo.core.security.SecurityUtil;
  import org.apache.accumulo.core.security.thrift.TCredentials;
@@@ -213,8 -205,11 +207,13 @@@ import org.apache.accumulo.tserver.metr
  import org.apache.accumulo.tserver.metrics.TabletServerMinCMetrics;
  import org.apache.accumulo.tserver.metrics.TabletServerScanMetrics;
  import org.apache.accumulo.tserver.metrics.TabletServerUpdateMetrics;
 +import org.apache.accumulo.tserver.replication.ReplicationServicerHandler;
 +import org.apache.accumulo.tserver.replication.ReplicationWorker;
+ import org.apache.accumulo.tserver.session.ConditionalSession;
+ import org.apache.accumulo.tserver.session.MultiScanSession;
+ import org.apache.accumulo.tserver.session.ScanSession;
+ import org.apache.accumulo.tserver.session.Session;
+ import org.apache.accumulo.tserver.session.UpdateSession;
  import org.apache.accumulo.tserver.tablet.CommitSession;
  import org.apache.accumulo.tserver.tablet.CompactionInfo;
  import org.apache.accumulo.tserver.tablet.CompactionWatcher;
@@@ -262,18 -258,56 +262,61 @@@ public class TabletServer extends Abstr
      return mincMetrics;
    }
  
-   private ServerConfiguration serverConfig;
-   private LogSorter logSorter = null;
+   private final ServerConfiguration serverConfig;
+   private final LogSorter logSorter;
 +  private ReplicationWorker replWorker = null;
+   private final TabletStatsKeeper statsKeeper;
+   private final AtomicInteger logIdGenerator = new AtomicInteger();
+   
+   private final VolumeManager fs;
+   public Instance getInstance() {
+     return serverConfig.getInstance();
+   }
+ 
+   private final SortedMap<KeyExtent,Tablet> onlineTablets = Collections.synchronizedSortedMap(new TreeMap<KeyExtent,Tablet>());
+   private final SortedSet<KeyExtent> unopenedTablets = Collections.synchronizedSortedSet(new TreeSet<KeyExtent>());
+   private final SortedSet<KeyExtent> openingTablets = Collections.synchronizedSortedSet(new TreeSet<KeyExtent>());
+   @SuppressWarnings("unchecked")
+   private final Map<KeyExtent,Long> recentlyUnloadedCache = Collections.synchronizedMap(new LRUMap(1000));
+ 
+   private final TabletServerResourceManager resourceManager;
+   private final SecurityOperation security;
+ 
+   private final BlockingDeque<MasterMessage> masterMessages = new LinkedBlockingDeque<MasterMessage>();
+ 
+   private Thread majorCompactorThread;
+ 
++  private HostAndPort replicationAddress;
+   private HostAndPort clientAddress;
+ 
+   private volatile boolean serverStopRequested = false;
+   private volatile boolean majorCompactorDisabled = false;
+   private volatile boolean shutdownComplete = false;
+ 
+   private ZooLock tabletServerLock;
+ 
+   private TServer server;
++  private TServer replServer;
+ 
+   private DistributedWorkQueue bulkFailedCopyQ;
+ 
+   private String lockID;
+ 
+   private static ObjectName OBJECT_NAME = null;
+ 
+   public static final AtomicLong seekCount = new AtomicLong(0);
+   
+   private final AtomicLong totalMinorCompactions = new AtomicLong(0);
  
    public TabletServer(ServerConfiguration conf, VolumeManager fs) {
      super();
      this.serverConfig = conf;
-     this.instance = conf.getInstance();
      this.fs = fs;
      AccumuloConfiguration aconf = getSystemConfiguration();
 -    this.logSorter = new LogSorter(getInstance(), fs, aconf);
++    Instance instance = getInstance();
 +    this.logSorter = new LogSorter(instance, fs, aconf);
 +    this.replWorker = new ReplicationWorker(instance, fs, aconf);
+     this.statsKeeper = new TabletStatsKeeper();
      SimpleTimer.getInstance(aconf).schedule(new Runnable() {
        @Override
        public void run() {
@@@ -3122,29 -2856,6 +2880,29 @@@
      return address;
    }
  
 +  private HostAndPort startReplicationService() throws UnknownHostException {
 +    ReplicationServicer.Iface repl = TraceWrap.service(new ReplicationServicerHandler(HdfsZooInstance.getInstance()));
 +    ReplicationServicer.Processor<ReplicationServicer.Iface> processor = new ReplicationServicer.Processor<ReplicationServicer.Iface>(repl);
 +    AccumuloConfiguration conf = getSystemConfiguration();
 +    Property maxMessageSizeProperty = (conf.get(Property.TSERV_MAX_MESSAGE_SIZE) != null ? Property.TSERV_MAX_MESSAGE_SIZE : Property.GENERAL_MAX_MESSAGE_SIZE);
 +    ServerAddress sp = TServerUtils.startServer(conf, clientAddress.getHostText(), Property.REPLICATION_RECEIPT_SERVICE_PORT, processor,
 +        "ReplicationServicerHandler", "Replication Servicer", null, Property.REPLICATION_MIN_THREADS, Property.REPLICATION_THREADCHECK, maxMessageSizeProperty);
 +    this.replServer = sp.server;
 +    log.info("Started replication service on " + sp.address);
 +
 +    try {
 +      // The replication service is unique to the thrift service for a tserver, not just a host.
 +      // Advertise the host and port for replication service given the host and port for the tserver.
-       ZooReaderWriter.getInstance().putPersistentData(ZooUtil.getRoot(instance) + Constants.ZREPLICATION_TSERVERS + "/" + clientAddress.toString(),
++      ZooReaderWriter.getInstance().putPersistentData(ZooUtil.getRoot(getInstance()) + Constants.ZREPLICATION_TSERVERS + "/" + clientAddress.toString(),
 +          sp.address.toString().getBytes(StandardCharsets.UTF_8), NodeExistsPolicy.OVERWRITE);
 +    } catch (Exception e) {
 +      log.error("Could not advertise replication service port", e);
 +      throw new RuntimeException(e);
 +    }
 +
 +    return sp.address;
 +  }
 +
    public ZooLock getLock() {
      return tabletServerLock;
    }
@@@ -3527,6 -3205,6 +3280,13 @@@
      return clientAddress.getHostText() + ":" + clientAddress.getPort();
    }
  
++  public String getReplicationAddressSTring() {
++    if (null == replicationAddress) {
++      return null;
++    }
++    return replicationAddress.getHostText() + ":" + replicationAddress.getPort();
++  }
++
    public TServerInstance getTabletSession() {
      String address = getClientAddressString();
      if (address == null)

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ada6ce46/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
----------------------------------------------------------------------