You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2014/06/14 06:55:34 UTC
[34/51] [abbrv] git commit: Merge remote-tracking branch
'origin/master' into ACCUMULO-378
Merge remote-tracking branch 'origin/master' into ACCUMULO-378
Conflicts:
server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/ada6ce46
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/ada6ce46
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/ada6ce46
Branch: refs/heads/master
Commit: ada6ce464b1e9d818c06655369b30a45afa840c0
Parents: 98eb56f 47d5933
Author: Josh Elser <el...@apache.org>
Authored: Wed Jun 4 22:07:59 2014 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Wed Jun 4 22:07:59 2014 -0400
----------------------------------------------------------------------
.../server/GarbageCollectionLogger.java | 103 ++++
.../apache/accumulo/tserver/TabletServer.java | 607 ++++++-------------
.../accumulo/tserver/TservConstraintEnv.java | 83 +++
.../apache/accumulo/tserver/log/DfsLogger.java | 20 +-
.../tserver/session/ConditionalSession.java | 41 ++
.../tserver/session/MultiScanSession.java | 62 ++
.../accumulo/tserver/session/ScanSession.java | 69 +++
.../accumulo/tserver/session/Session.java | 43 ++
.../accumulo/tserver/session/UpdateSession.java | 56 ++
.../apache/accumulo/tserver/tablet/Tablet.java | 12 +-
.../apache/accumulo/test/AuditMessageIT.java | 5 +
11 files changed, 649 insertions(+), 452 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/ada6ce46/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
----------------------------------------------------------------------
diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index e6286ff,ee28c7f..dd3c16e
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@@ -78,11 -76,7 +76,8 @@@ import org.apache.accumulo.core.client.
import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
- import org.apache.accumulo.core.constraints.Constraint.Environment;
- import org.apache.accumulo.core.constraints.Violations;
- import org.apache.accumulo.core.data.ByteSequence;
import org.apache.accumulo.core.data.Column;
+import org.apache.accumulo.core.data.ColumnUpdate;
import org.apache.accumulo.core.data.ConstraintViolationSummary;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.KeyExtent;
@@@ -116,8 -110,6 +111,7 @@@ import org.apache.accumulo.core.master.
import org.apache.accumulo.core.metadata.MetadataTable;
import org.apache.accumulo.core.metadata.RootTable;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
+import org.apache.accumulo.core.replication.thrift.ReplicationServicer;
- import org.apache.accumulo.core.security.AuthorizationContainer;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.security.SecurityUtil;
import org.apache.accumulo.core.security.thrift.TCredentials;
@@@ -213,8 -205,11 +207,13 @@@ import org.apache.accumulo.tserver.metr
import org.apache.accumulo.tserver.metrics.TabletServerMinCMetrics;
import org.apache.accumulo.tserver.metrics.TabletServerScanMetrics;
import org.apache.accumulo.tserver.metrics.TabletServerUpdateMetrics;
+import org.apache.accumulo.tserver.replication.ReplicationServicerHandler;
+import org.apache.accumulo.tserver.replication.ReplicationWorker;
+ import org.apache.accumulo.tserver.session.ConditionalSession;
+ import org.apache.accumulo.tserver.session.MultiScanSession;
+ import org.apache.accumulo.tserver.session.ScanSession;
+ import org.apache.accumulo.tserver.session.Session;
+ import org.apache.accumulo.tserver.session.UpdateSession;
import org.apache.accumulo.tserver.tablet.CommitSession;
import org.apache.accumulo.tserver.tablet.CompactionInfo;
import org.apache.accumulo.tserver.tablet.CompactionWatcher;
@@@ -262,18 -258,56 +262,61 @@@ public class TabletServer extends Abstr
return mincMetrics;
}
- private ServerConfiguration serverConfig;
- private LogSorter logSorter = null;
+ private final ServerConfiguration serverConfig;
+ private final LogSorter logSorter;
+ private ReplicationWorker replWorker = null;
+ private final TabletStatsKeeper statsKeeper;
+ private final AtomicInteger logIdGenerator = new AtomicInteger();
+
+ private final VolumeManager fs;
+ public Instance getInstance() {
+ return serverConfig.getInstance();
+ }
+
+ private final SortedMap<KeyExtent,Tablet> onlineTablets = Collections.synchronizedSortedMap(new TreeMap<KeyExtent,Tablet>());
+ private final SortedSet<KeyExtent> unopenedTablets = Collections.synchronizedSortedSet(new TreeSet<KeyExtent>());
+ private final SortedSet<KeyExtent> openingTablets = Collections.synchronizedSortedSet(new TreeSet<KeyExtent>());
+ @SuppressWarnings("unchecked")
+ private final Map<KeyExtent,Long> recentlyUnloadedCache = Collections.synchronizedMap(new LRUMap(1000));
+
+ private final TabletServerResourceManager resourceManager;
+ private final SecurityOperation security;
+
+ private final BlockingDeque<MasterMessage> masterMessages = new LinkedBlockingDeque<MasterMessage>();
+
+ private Thread majorCompactorThread;
+
++ private HostAndPort replicationAddress;
+ private HostAndPort clientAddress;
+
+ private volatile boolean serverStopRequested = false;
+ private volatile boolean majorCompactorDisabled = false;
+ private volatile boolean shutdownComplete = false;
+
+ private ZooLock tabletServerLock;
+
+ private TServer server;
++ private TServer replServer;
+
+ private DistributedWorkQueue bulkFailedCopyQ;
+
+ private String lockID;
+
+ private static ObjectName OBJECT_NAME = null;
+
+ public static final AtomicLong seekCount = new AtomicLong(0);
+
+ private final AtomicLong totalMinorCompactions = new AtomicLong(0);
public TabletServer(ServerConfiguration conf, VolumeManager fs) {
super();
this.serverConfig = conf;
- this.instance = conf.getInstance();
this.fs = fs;
AccumuloConfiguration aconf = getSystemConfiguration();
- this.logSorter = new LogSorter(getInstance(), fs, aconf);
++ Instance instance = getInstance();
+ this.logSorter = new LogSorter(instance, fs, aconf);
+ this.replWorker = new ReplicationWorker(instance, fs, aconf);
+ this.statsKeeper = new TabletStatsKeeper();
SimpleTimer.getInstance(aconf).schedule(new Runnable() {
@Override
public void run() {
@@@ -3122,29 -2856,6 +2880,29 @@@
return address;
}
+ private HostAndPort startReplicationService() throws UnknownHostException {
+ ReplicationServicer.Iface repl = TraceWrap.service(new ReplicationServicerHandler(HdfsZooInstance.getInstance()));
+ ReplicationServicer.Processor<ReplicationServicer.Iface> processor = new ReplicationServicer.Processor<ReplicationServicer.Iface>(repl);
+ AccumuloConfiguration conf = getSystemConfiguration();
+ Property maxMessageSizeProperty = (conf.get(Property.TSERV_MAX_MESSAGE_SIZE) != null ? Property.TSERV_MAX_MESSAGE_SIZE : Property.GENERAL_MAX_MESSAGE_SIZE);
+ ServerAddress sp = TServerUtils.startServer(conf, clientAddress.getHostText(), Property.REPLICATION_RECEIPT_SERVICE_PORT, processor,
+ "ReplicationServicerHandler", "Replication Servicer", null, Property.REPLICATION_MIN_THREADS, Property.REPLICATION_THREADCHECK, maxMessageSizeProperty);
+ this.replServer = sp.server;
+ log.info("Started replication service on " + sp.address);
+
+ try {
+ // The replication service is unique to the thrift service for a tserver, not just a host.
+ // Advertise the host and port for replication service given the host and port for the tserver.
- ZooReaderWriter.getInstance().putPersistentData(ZooUtil.getRoot(instance) + Constants.ZREPLICATION_TSERVERS + "/" + clientAddress.toString(),
++ ZooReaderWriter.getInstance().putPersistentData(ZooUtil.getRoot(getInstance()) + Constants.ZREPLICATION_TSERVERS + "/" + clientAddress.toString(),
+ sp.address.toString().getBytes(StandardCharsets.UTF_8), NodeExistsPolicy.OVERWRITE);
+ } catch (Exception e) {
+ log.error("Could not advertise replication service port", e);
+ throw new RuntimeException(e);
+ }
+
+ return sp.address;
+ }
+
public ZooLock getLock() {
return tabletServerLock;
}
@@@ -3527,6 -3205,6 +3280,13 @@@
return clientAddress.getHostText() + ":" + clientAddress.getPort();
}
++ public String getReplicationAddressSTring() {
++ if (null == replicationAddress) {
++ return null;
++ }
++ return replicationAddress.getHostText() + ":" + replicationAddress.getPort();
++ }
++
public TServerInstance getTabletSession() {
String address = getClientAddressString();
if (address == null)
http://git-wip-us.apache.org/repos/asf/accumulo/blob/ada6ce46/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
----------------------------------------------------------------------