You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ec...@apache.org on 2014/03/05 22:13:13 UTC
[1/4] git commit: ACCUMULO-2412 use only pre-existing merge requests
before processing the metadata table
Repository: accumulo
Updated Branches:
refs/heads/1.6.0-SNAPSHOT 4fd8686e4 -> 1392e07fb
ACCUMULO-2412 use only pre-existing merge requests before processing the metadata table
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/8e7a8a2c
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/8e7a8a2c
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/8e7a8a2c
Branch: refs/heads/1.6.0-SNAPSHOT
Commit: 8e7a8a2ccfd8f124f532fed747852c87f8b05cdd
Parents: 9b5c53d
Author: Eric Newton <er...@gmail.com>
Authored: Wed Mar 5 15:43:23 2014 -0500
Committer: Eric Newton <er...@gmail.com>
Committed: Wed Mar 5 15:43:23 2014 -0500
----------------------------------------------------------------------
.../main/java/org/apache/accumulo/server/master/Master.java | 7 ++++++-
1 file changed, 6 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/8e7a8a2c/server/src/main/java/org/apache/accumulo/server/master/Master.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/master/Master.java b/server/src/main/java/org/apache/accumulo/server/master/Master.java
index 70f87ae..e72fe3f 100644
--- a/server/src/main/java/org/apache/accumulo/server/master/Master.java
+++ b/server/src/main/java/org/apache/accumulo/server/master/Master.java
@@ -1288,6 +1288,11 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt
int unloaded = 0;
try {
Map<Text,MergeStats> mergeStatsCache = new HashMap<Text,MergeStats>();
+ for (MergeInfo merge : merges()) {
+ if (merge.getRange() != null) {
+ mergeStatsCache.put(merge.getRange().getTableId(), new MergeStats(merge));
+ }
+ }
// Get the current status for the current list of tservers
SortedMap<TServerInstance,TabletServerStatus> currentTServers = new TreeMap<TServerInstance,TabletServerStatus>();
@@ -1334,7 +1339,7 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt
Text tableId = tls.extent.getTableId();
MergeStats mergeStats = mergeStatsCache.get(tableId);
if (mergeStats == null) {
- mergeStatsCache.put(tableId, mergeStats = new MergeStats(getMergeInfo(tls.extent)));
+ mergeStatsCache.put(tableId, mergeStats = new MergeStats(new MergeInfo()));
}
TabletGoalState goal = getGoalState(tls, mergeStats.getMergeInfo());
TServerInstance server = tls.getServer();
[3/4] git commit: Merge branch '1.4.5-SNAPSHOT' into 1.5.2-SNAPSHOT
Posted by ec...@apache.org.
Merge branch '1.4.5-SNAPSHOT' into 1.5.2-SNAPSHOT
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/b1351112
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/b1351112
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/b1351112
Branch: refs/heads/1.6.0-SNAPSHOT
Commit: b1351112f6aad106c1f74a80e883248f12547f85
Parents: 8e7a8a2 4cd12c4
Author: Eric Newton <er...@gmail.com>
Authored: Wed Mar 5 15:51:33 2014 -0500
Committer: Eric Newton <er...@gmail.com>
Committed: Wed Mar 5 15:51:33 2014 -0500
----------------------------------------------------------------------
----------------------------------------------------------------------
[2/4] git commit: ACCUMULO-2412 use only pre-existing merge requests
before processing the metadata table
Posted by ec...@apache.org.
ACCUMULO-2412 use only pre-existing merge requests before processing the metadata table
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/4cd12c42
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/4cd12c42
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/4cd12c42
Branch: refs/heads/1.6.0-SNAPSHOT
Commit: 4cd12c4251a7a6ac4d04b23cf7c736d81acd33de
Parents: 0f23245
Author: Eric Newton <er...@gmail.com>
Authored: Wed Mar 5 15:50:52 2014 -0500
Committer: Eric Newton <er...@gmail.com>
Committed: Wed Mar 5 15:50:52 2014 -0500
----------------------------------------------------------------------
.../src/main/java/org/apache/accumulo/server/master/Master.java | 5 +++++
1 file changed, 5 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4cd12c42/src/server/src/main/java/org/apache/accumulo/server/master/Master.java
----------------------------------------------------------------------
diff --git a/src/server/src/main/java/org/apache/accumulo/server/master/Master.java b/src/server/src/main/java/org/apache/accumulo/server/master/Master.java
index f969f1b..aa8305b 100644
--- a/src/server/src/main/java/org/apache/accumulo/server/master/Master.java
+++ b/src/server/src/main/java/org/apache/accumulo/server/master/Master.java
@@ -1293,6 +1293,11 @@ public class Master implements LiveTServerSet.Listener, LoggerWatcher, TableObse
int unloaded = 0;
try {
Map<Text,MergeStats> mergeStatsCache = new HashMap<Text,MergeStats>();
+ for (MergeInfo merge : merges()) {
+ if (merge.getRange() != null) {
+ mergeStatsCache.put(merge.getRange().getTableId(), new MergeStats(merge));
+ }
+ }
// Get the current status for the current list of tservers
SortedMap<TServerInstance,TabletServerStatus> currentTServers = new TreeMap<TServerInstance,TabletServerStatus>();
[4/4] git commit: ACCUMULO-2412 use only pre-existing merge requests
before processing the metadata table
Posted by ec...@apache.org.
ACCUMULO-2412 use only pre-existing merge requests before processing the metadata table
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/1392e07f
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/1392e07f
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/1392e07f
Branch: refs/heads/1.6.0-SNAPSHOT
Commit: 1392e07fb5a4d5efe803810bcd888568cc57d9b9
Parents: 4fd8686 b135111
Author: Eric Newton <er...@gmail.com>
Authored: Wed Mar 5 16:12:07 2014 -0500
Committer: Eric Newton <er...@gmail.com>
Committed: Wed Mar 5 16:12:07 2014 -0500
----------------------------------------------------------------------
.../src/main/java/org/apache/accumulo/master/Master.java | 4 ----
.../java/org/apache/accumulo/master/TabletGroupWatcher.java | 7 ++++++-
2 files changed, 6 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/1392e07f/server/master/src/main/java/org/apache/accumulo/master/Master.java
----------------------------------------------------------------------
diff --cc server/master/src/main/java/org/apache/accumulo/master/Master.java
index 6a75872,0000000..e123b49
mode 100644,000000..100644
--- a/server/master/src/main/java/org/apache/accumulo/master/Master.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/Master.java
@@@ -1,1231 -1,0 +1,1227 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.master;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.impl.Namespaces;
+import org.apache.accumulo.core.client.impl.Tables;
+import org.apache.accumulo.core.client.impl.ThriftTransportPool;
+import org.apache.accumulo.core.client.impl.thrift.TableOperation;
+import org.apache.accumulo.core.client.impl.thrift.TableOperationExceptionType;
+import org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.master.state.tables.TableState;
+import org.apache.accumulo.core.master.thrift.MasterClientService.Iface;
+import org.apache.accumulo.core.master.thrift.MasterClientService.Processor;
+import org.apache.accumulo.core.master.thrift.MasterGoalState;
+import org.apache.accumulo.core.master.thrift.MasterState;
+import org.apache.accumulo.core.master.thrift.TabletServerStatus;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.RootTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.Credentials;
+import org.apache.accumulo.core.security.NamespacePermission;
+import org.apache.accumulo.core.security.SecurityUtil;
+import org.apache.accumulo.core.security.TablePermission;
+import org.apache.accumulo.core.util.Daemon;
+import org.apache.accumulo.core.util.Pair;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.fate.AgeOffStore;
+import org.apache.accumulo.fate.Fate;
+import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
+import org.apache.accumulo.fate.zookeeper.ZooLock.LockLossReason;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy;
+import org.apache.accumulo.master.recovery.RecoveryManager;
+import org.apache.accumulo.master.state.TableCounts;
+import org.apache.accumulo.server.Accumulo;
+import org.apache.accumulo.server.ServerConstants;
+import org.apache.accumulo.server.ServerOpts;
+import org.apache.accumulo.server.client.HdfsZooInstance;
+import org.apache.accumulo.server.conf.ServerConfiguration;
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.fs.VolumeManager.FileType;
+import org.apache.accumulo.server.fs.VolumeManagerImpl;
+import org.apache.accumulo.server.init.Initialize;
+import org.apache.accumulo.server.master.LiveTServerSet;
+import org.apache.accumulo.server.master.LiveTServerSet.TServerConnection;
+import org.apache.accumulo.server.master.balancer.DefaultLoadBalancer;
+import org.apache.accumulo.server.master.balancer.TabletBalancer;
+import org.apache.accumulo.server.master.state.CurrentState;
+import org.apache.accumulo.server.master.state.DeadServerList;
+import org.apache.accumulo.server.master.state.MergeInfo;
+import org.apache.accumulo.server.master.state.MergeState;
+import org.apache.accumulo.server.master.state.MetaDataStateStore;
+import org.apache.accumulo.server.master.state.RootTabletStateStore;
+import org.apache.accumulo.server.master.state.TServerInstance;
+import org.apache.accumulo.server.master.state.TabletLocationState;
+import org.apache.accumulo.server.master.state.TabletMigration;
+import org.apache.accumulo.server.master.state.TabletState;
+import org.apache.accumulo.server.master.state.ZooStore;
+import org.apache.accumulo.server.master.state.ZooTabletStateStore;
+import org.apache.accumulo.server.security.AuditedSecurityOperation;
+import org.apache.accumulo.server.security.SecurityOperation;
+import org.apache.accumulo.server.security.SystemCredentials;
+import org.apache.accumulo.server.security.handler.ZKPermHandler;
+import org.apache.accumulo.server.tables.TableManager;
+import org.apache.accumulo.server.tables.TableObserver;
+import org.apache.accumulo.server.util.DefaultMap;
+import org.apache.accumulo.server.util.Halt;
+import org.apache.accumulo.server.util.MetadataTableUtil;
+import org.apache.accumulo.server.util.TServerUtils;
+import org.apache.accumulo.server.util.TServerUtils.ServerAddress;
+import org.apache.accumulo.server.util.time.SimpleTimer;
+import org.apache.accumulo.server.zookeeper.ZooLock;
+import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
+import org.apache.accumulo.trace.instrument.thrift.TraceWrap;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.Text;
+import org.apache.log4j.Logger;
+import org.apache.thrift.TException;
+import org.apache.thrift.server.TServer;
+import org.apache.thrift.transport.TTransportException;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.data.Stat;
+
+import com.google.common.collect.Iterables;
+
+/**
+ * The Master is responsible for assigning and balancing tablets to tablet servers.
+ *
+ * The master will also coordinate log recoveries and reports general status.
+ */
+public class Master implements LiveTServerSet.Listener, TableObserver, CurrentState {
+
+ final static Logger log = Logger.getLogger(Master.class);
+
+ final static int ONE_SECOND = 1000;
+ final private static Text METADATA_TABLE_ID = new Text(MetadataTable.ID);
+ final private static Text ROOT_TABLE_ID = new Text(RootTable.ID);
+ final static long TIME_TO_WAIT_BETWEEN_SCANS = 60 * ONE_SECOND;
+ final private static long TIME_BETWEEN_MIGRATION_CLEANUPS = 5 * 60 * ONE_SECOND;
+ final static long WAIT_BETWEEN_ERRORS = ONE_SECOND;
+ final private static long DEFAULT_WAIT_FOR_WATCHER = 10 * ONE_SECOND;
+ final private static int MAX_CLEANUP_WAIT_TIME = ONE_SECOND;
+ final private static int TIME_TO_WAIT_BETWEEN_LOCK_CHECKS = ONE_SECOND;
+ final static int MAX_TSERVER_WORK_CHUNK = 5000;
+ final private static int MAX_BAD_STATUS_COUNT = 3;
+
+ final VolumeManager fs;
+ final private Instance instance;
+ final private String hostname;
+ final LiveTServerSet tserverSet;
+ final private List<TabletGroupWatcher> watchers = new ArrayList<TabletGroupWatcher>();
+ final SecurityOperation security;
+ final Map<TServerInstance,AtomicInteger> badServers = Collections.synchronizedMap(new DefaultMap<TServerInstance,AtomicInteger>(new AtomicInteger()));
+ final Set<TServerInstance> serversToShutdown = Collections.synchronizedSet(new HashSet<TServerInstance>());
+ final SortedMap<KeyExtent,TServerInstance> migrations = Collections.synchronizedSortedMap(new TreeMap<KeyExtent,TServerInstance>());
+ final EventCoordinator nextEvent = new EventCoordinator();
+ final private Object mergeLock = new Object();
+ RecoveryManager recoveryManager = null;
+
+ ZooLock masterLock = null;
+ private TServer clientService = null;
+ TabletBalancer tabletBalancer;
+
+ private MasterState state = MasterState.INITIAL;
+
+ Fate<Master> fate;
+
+ volatile SortedMap<TServerInstance,TabletServerStatus> tserverStatus = Collections.unmodifiableSortedMap(new TreeMap<TServerInstance,TabletServerStatus>());
+
+ synchronized MasterState getMasterState() {
+ return state;
+ }
+
+ public boolean stillMaster() {
+ return getMasterState() != MasterState.STOP;
+ }
+
+ static final boolean X = true;
+ static final boolean _ = false;
+ // @formatter:off
+ static final boolean transitionOK[][] = {
+ // INITIAL HAVE_LOCK SAFE_MODE NORMAL UNLOAD_META UNLOAD_ROOT STOP
+ /* INITIAL */ {X, X, _, _, _, _, X},
+ /* HAVE_LOCK */ {_, X, X, X, _, _, X},
+ /* SAFE_MODE */ {_, _, X, X, X, _, X},
+ /* NORMAL */ {_, _, X, X, X, _, X},
+ /* UNLOAD_METADATA_TABLETS */ {_, _, X, X, X, X, X},
+ /* UNLOAD_ROOT_TABLET */ {_, _, _, X, X, X, X},
+ /* STOP */ {_, _, _, _, _, X, X}};
+ //@formatter:on
+ synchronized void setMasterState(MasterState newState) {
+ if (state.equals(newState))
+ return;
+ if (!transitionOK[state.ordinal()][newState.ordinal()]) {
+ log.error("Programmer error: master should not transition from " + state + " to " + newState);
+ }
+ MasterState oldState = state;
+ state = newState;
+ nextEvent.event("State changed from %s to %s", oldState, newState);
+ if (newState == MasterState.STOP) {
+ // Give the server a little time before shutdown so the client
+ // thread requesting the stop can return
+ SimpleTimer.getInstance().schedule(new Runnable() {
+ @Override
+ public void run() {
+ // This frees the main thread and will cause the master to exit
+ clientService.stop();
+ Master.this.nextEvent.event("stopped event loop");
+ }
+
+ }, 100l, 1000l);
+ }
+
+ if (oldState != newState && (newState == MasterState.HAVE_LOCK)) {
+ upgradeZookeeper();
+ }
+
+ if (oldState != newState && (newState == MasterState.NORMAL)) {
+ upgradeMetadata();
+ }
+ }
+
+ private void moveRootTabletToRootTable(IZooReaderWriter zoo) throws Exception {
+ String dirZPath = ZooUtil.getRoot(instance) + RootTable.ZROOT_TABLET_PATH;
+
+ if (!zoo.exists(dirZPath)) {
+ Path oldPath = fs.getFullPath(FileType.TABLE, "/" + MetadataTable.ID + "/root_tablet");
+ if (fs.exists(oldPath)) {
+ String newPath = fs.choose(ServerConstants.getTablesDirs()) + "/" + RootTable.ID;
+ fs.mkdirs(new Path(newPath));
+ if (!fs.rename(oldPath, new Path(newPath))) {
+ throw new IOException("Failed to move root tablet from " + oldPath + " to " + newPath);
+ }
+
+ log.info("Upgrade renamed " + oldPath + " to " + newPath);
+ }
+
+ Path location = null;
+
+ for (String basePath : ServerConstants.getTablesDirs()) {
+ Path path = new Path(basePath + "/" + RootTable.ID + RootTable.ROOT_TABLET_LOCATION);
+ if (fs.exists(path)) {
+ if (location != null) {
+ throw new IllegalStateException("Root table at multiple locations " + location + " " + path);
+ }
+
+ location = path;
+ }
+ }
+
+ if (location == null)
+ throw new IllegalStateException("Failed to find root tablet");
+
+ log.info("Upgrade setting root table location in zookeeper " + location);
+ zoo.putPersistentData(dirZPath, location.toString().getBytes(), NodeExistsPolicy.FAIL);
+ }
+ }
+
+ private void upgradeZookeeper() {
+ // 1.5.1 and 1.6.0 both do some state checking after obtaining the zoolock for the
+ // monitor and before starting up. It's not tied to the data version at all (and would
+ // introduce unnecessary complexity to try to make the master do it), but be aware
+ // that the master is not the only thing that may alter zookeeper before starting.
+
+ if (Accumulo.getAccumuloPersistentVersion(fs) == ServerConstants.PREV_DATA_VERSION) {
+ try {
+ log.info("Upgrading zookeeper");
+
+ IZooReaderWriter zoo = ZooReaderWriter.getInstance();
+
+ // create initial namespaces
+ String namespaces = ZooUtil.getRoot(instance) + Constants.ZNAMESPACES;
+ zoo.putPersistentData(namespaces, new byte[0], NodeExistsPolicy.SKIP);
+ for (Pair<String,String> namespace : Iterables.concat(
+ Collections.singleton(new Pair<String,String>(Namespaces.ACCUMULO_NAMESPACE, Namespaces.ACCUMULO_NAMESPACE_ID)),
+ Collections.singleton(new Pair<String,String>(Namespaces.DEFAULT_NAMESPACE, Namespaces.DEFAULT_NAMESPACE_ID)))) {
+ String ns = namespace.getFirst();
+ String id = namespace.getSecond();
+ log.debug("Upgrade creating namespace \"" + ns + "\" (ID: " + id + ")");
+ if (!Namespaces.exists(instance, id))
+ TableManager.prepareNewNamespaceState(instance.getInstanceID(), id, ns, NodeExistsPolicy.SKIP);
+ }
+
+ // create root table
+ log.debug("Upgrade creating table " + RootTable.NAME + " (ID: " + RootTable.ID + ")");
+ TableManager.prepareNewTableState(instance.getInstanceID(), RootTable.ID, Namespaces.ACCUMULO_NAMESPACE_ID, RootTable.NAME, TableState.ONLINE,
+ NodeExistsPolicy.SKIP);
+ Initialize.initMetadataConfig(RootTable.ID);
+ // ensure root user can flush root table
+ security.grantTablePermission(SystemCredentials.get().toThrift(instance), security.getRootUsername(), RootTable.ID, TablePermission.ALTER_TABLE, Namespaces.ACCUMULO_NAMESPACE_ID);
+
+ // put existing tables in the correct namespaces
+ String tables = ZooUtil.getRoot(instance) + Constants.ZTABLES;
+ for (String tableId : zoo.getChildren(tables)) {
+ String targetNamespace = (MetadataTable.ID.equals(tableId) || RootTable.ID.equals(tableId)) ? Namespaces.ACCUMULO_NAMESPACE_ID
+ : Namespaces.DEFAULT_NAMESPACE_ID;
+ log.debug("Upgrade moving table " + new String(zoo.getData(tables + "/" + tableId + Constants.ZTABLE_NAME, null), Constants.UTF8) + " (ID: "
+ + tableId + ") into namespace with ID " + targetNamespace);
+ zoo.putPersistentData(tables + "/" + tableId + Constants.ZTABLE_NAMESPACE, targetNamespace.getBytes(Constants.UTF8), NodeExistsPolicy.SKIP);
+ }
+
+ // rename metadata table
+ log.debug("Upgrade renaming table " + MetadataTable.OLD_NAME + " (ID: " + MetadataTable.ID + ") to " + MetadataTable.NAME);
+ zoo.putPersistentData(tables + "/" + MetadataTable.ID + Constants.ZTABLE_NAME, Tables.qualify(MetadataTable.NAME).getSecond().getBytes(Constants.UTF8),
+ NodeExistsPolicy.OVERWRITE);
+
+ moveRootTabletToRootTable(zoo);
+
+ // add system namespace permissions to existing users
+ ZKPermHandler perm = new ZKPermHandler();
+ perm.initialize(instance.getInstanceID(), true);
+ String users = ZooUtil.getRoot(instance) + "/users";
+ for (String user : zoo.getChildren(users)) {
+ zoo.putPersistentData(users + "/" + user + "/Namespaces", new byte[0], NodeExistsPolicy.SKIP);
+ perm.grantNamespacePermission(user, Namespaces.ACCUMULO_NAMESPACE_ID, NamespacePermission.READ);
+ }
+ perm.grantNamespacePermission("root", Namespaces.ACCUMULO_NAMESPACE_ID, NamespacePermission.ALTER_TABLE);
+
+ } catch (Exception ex) {
+ log.fatal("Error performing upgrade", ex);
+ System.exit(1);
+ }
+ }
+ }
+
+ private final AtomicBoolean upgradeMetadataRunning = new AtomicBoolean(false);
+
+ private final ServerConfiguration serverConfig;
+
+ private void upgradeMetadata() {
+ if (Accumulo.getAccumuloPersistentVersion(fs) == ServerConstants.PREV_DATA_VERSION) {
+ if (upgradeMetadataRunning.compareAndSet(false, true)) {
+ Runnable upgradeTask = new Runnable() {
+ @Override
+ public void run() {
+ try {
+ MetadataTableUtil.moveMetaDeleteMarkers(instance, SystemCredentials.get());
+ Accumulo.updateAccumuloVersion(fs);
+
+ log.info("Upgrade complete");
+
+ } catch (Exception ex) {
+ log.fatal("Error performing upgrade", ex);
+ System.exit(1);
+ }
+
+ }
+ };
+
+ // need to run this in a separate thread because a lock is held that prevents metadata tablets from being assigned and this task writes to the
+ // metadata table
+ new Thread(upgradeTask).start();
+ }
+ }
+ }
+
+ private int assignedOrHosted(Text tableId) {
+ int result = 0;
+ for (TabletGroupWatcher watcher : watchers) {
+ TableCounts count = watcher.getStats(tableId);
+ result += count.hosted() + count.assigned();
+ }
+ return result;
+ }
+
+ private int totalAssignedOrHosted() {
+ int result = 0;
+ for (TabletGroupWatcher watcher : watchers) {
+ for (TableCounts counts : watcher.getStats().values()) {
+ result += counts.assigned() + counts.hosted();
+ }
+ }
+ return result;
+ }
+
+ private int nonMetaDataTabletsAssignedOrHosted() {
+ return totalAssignedOrHosted() - assignedOrHosted(new Text(MetadataTable.ID)) - assignedOrHosted(new Text(RootTable.ID));
+ }
+
+ private int notHosted() {
+ int result = 0;
+ for (TabletGroupWatcher watcher : watchers) {
+ for (TableCounts counts : watcher.getStats().values()) {
+ result += counts.assigned() + counts.assignedToDeadServers();
+ }
+ }
+ return result;
+ }
+
+ // The number of unassigned tablets that should be assigned: displayed on the monitor page
+ int displayUnassigned() {
+ int result = 0;
+ switch (getMasterState()) {
+ case NORMAL:
+ // Count offline tablets for online tables
+ for (TabletGroupWatcher watcher : watchers) {
+ TableManager manager = TableManager.getInstance();
+ for (Entry<Text,TableCounts> entry : watcher.getStats().entrySet()) {
+ Text tableId = entry.getKey();
+ TableCounts counts = entry.getValue();
+ TableState tableState = manager.getTableState(tableId.toString());
+ if (tableState != null && tableState.equals(TableState.ONLINE)) {
+ result += counts.unassigned() + counts.assignedToDeadServers() + counts.assigned();
+ }
+ }
+ }
+ break;
+ case SAFE_MODE:
+ // Count offline tablets for the metadata table
+ for (TabletGroupWatcher watcher : watchers) {
+ result += watcher.getStats(METADATA_TABLE_ID).unassigned();
+ }
+ break;
+ case UNLOAD_METADATA_TABLETS:
+ case UNLOAD_ROOT_TABLET:
+ for (TabletGroupWatcher watcher : watchers) {
+ result += watcher.getStats(METADATA_TABLE_ID).unassigned();
+ }
+ break;
+ default:
+ break;
+ }
+ return result;
+ }
+
+ public void mustBeOnline(final String tableId) throws ThriftTableOperationException {
+ Tables.clearCache(instance);
+ if (!Tables.getTableState(instance, tableId).equals(TableState.ONLINE))
+ throw new ThriftTableOperationException(tableId, null, TableOperation.MERGE, TableOperationExceptionType.OFFLINE, "table is not online");
+ }
+
+ public Connector getConnector() throws AccumuloException, AccumuloSecurityException {
+ return instance.getConnector(SystemCredentials.get().getPrincipal(), SystemCredentials.get().getToken());
+ }
+
+ private Master(ServerConfiguration config, VolumeManager fs, String hostname) throws IOException {
+ this.serverConfig = config;
+ this.instance = config.getInstance();
+ this.fs = fs;
+ this.hostname = hostname;
+
+ AccumuloConfiguration aconf = serverConfig.getConfiguration();
+
+ log.info("Version " + Constants.VERSION);
+ log.info("Instance " + instance.getInstanceID());
+ ThriftTransportPool.getInstance().setIdleTime(aconf.getTimeInMillis(Property.GENERAL_RPC_TIMEOUT));
+ security = AuditedSecurityOperation.getInstance();
+ tserverSet = new LiveTServerSet(instance, config.getConfiguration(), this);
+ this.tabletBalancer = aconf.instantiateClassProperty(Property.MASTER_TABLET_BALANCER, TabletBalancer.class, new DefaultLoadBalancer());
+ this.tabletBalancer.init(serverConfig);
+ }
+
+ public TServerConnection getConnection(TServerInstance server) {
+ return tserverSet.getConnection(server);
+ }
+
- public MergeInfo getMergeInfo(KeyExtent tablet) {
- return getMergeInfo(tablet.getTableId());
- }
-
+ public MergeInfo getMergeInfo(Text tableId) {
+ synchronized (mergeLock) {
+ try {
+ String path = ZooUtil.getRoot(instance.getInstanceID()) + Constants.ZTABLES + "/" + tableId.toString() + "/merge";
+ if (!ZooReaderWriter.getInstance().exists(path))
+ return new MergeInfo();
+ byte[] data = ZooReaderWriter.getInstance().getData(path, new Stat());
+ DataInputBuffer in = new DataInputBuffer();
+ in.reset(data, data.length);
+ MergeInfo info = new MergeInfo();
+ info.readFields(in);
+ return info;
+ } catch (KeeperException.NoNodeException ex) {
+ log.info("Error reading merge state, it probably just finished");
+ return new MergeInfo();
+ } catch (Exception ex) {
+ log.warn("Unexpected error reading merge state", ex);
+ return new MergeInfo();
+ }
+ }
+ }
+
+ public void setMergeState(MergeInfo info, MergeState state) throws IOException, KeeperException, InterruptedException {
+ synchronized (mergeLock) {
+ String path = ZooUtil.getRoot(instance.getInstanceID()) + Constants.ZTABLES + "/" + info.getExtent().getTableId().toString() + "/merge";
+ info.setState(state);
+ if (state.equals(MergeState.NONE)) {
+ ZooReaderWriter.getInstance().recursiveDelete(path, NodeMissingPolicy.SKIP);
+ } else {
+ DataOutputBuffer out = new DataOutputBuffer();
+ try {
+ info.write(out);
+ } catch (IOException ex) {
+ throw new RuntimeException("Unlikely", ex);
+ }
+ ZooReaderWriter.getInstance().putPersistentData(path, out.getData(),
+ state.equals(MergeState.STARTED) ? ZooUtil.NodeExistsPolicy.FAIL : ZooUtil.NodeExistsPolicy.OVERWRITE);
+ }
+ mergeLock.notifyAll();
+ }
+ nextEvent.event("Merge state of %s set to %s", info.getExtent(), state);
+ }
+
+ public void clearMergeState(Text tableId) throws IOException, KeeperException, InterruptedException {
+ synchronized (mergeLock) {
+ String path = ZooUtil.getRoot(instance.getInstanceID()) + Constants.ZTABLES + "/" + tableId.toString() + "/merge";
+ ZooReaderWriter.getInstance().recursiveDelete(path, NodeMissingPolicy.SKIP);
+ mergeLock.notifyAll();
+ }
+ nextEvent.event("Merge state of %s cleared", tableId);
+ }
+
+ void setMasterGoalState(MasterGoalState state) {
+ try {
+ ZooReaderWriter.getInstance().putPersistentData(ZooUtil.getRoot(instance) + Constants.ZMASTER_GOAL_STATE, state.name().getBytes(),
+ NodeExistsPolicy.OVERWRITE);
+ } catch (Exception ex) {
+ log.error("Unable to set master goal state in zookeeper");
+ }
+ }
+
+ MasterGoalState getMasterGoalState() {
+ while (true)
+ try {
+ byte[] data = ZooReaderWriter.getInstance().getData(ZooUtil.getRoot(instance) + Constants.ZMASTER_GOAL_STATE, null);
+ return MasterGoalState.valueOf(new String(data));
+ } catch (Exception e) {
+ log.error("Problem getting real goal state: " + e);
+ UtilWaitThread.sleep(1000);
+ }
+ }
+
+ public boolean hasCycled(long time) {
+ for (TabletGroupWatcher watcher : watchers) {
+ if (watcher.stats.lastScanFinished() < time)
+ return false;
+ }
+
+ return true;
+ }
+
+ public void clearMigrations(String tableId) {
+ synchronized (migrations) {
+ Iterator<KeyExtent> iterator = migrations.keySet().iterator();
+ while (iterator.hasNext()) {
+ KeyExtent extent = iterator.next();
+ if (extent.getTableId().toString().equals(tableId)) {
+ iterator.remove();
+ }
+ }
+ }
+ }
+
+ static enum TabletGoalState {
+ HOSTED, UNASSIGNED, DELETED
+ };
+
+ TabletGoalState getSystemGoalState(TabletLocationState tls) {
+ switch (getMasterState()) {
+ case NORMAL:
+ return TabletGoalState.HOSTED;
+ case HAVE_LOCK: // fall-through intended
+ case INITIAL: // fall-through intended
+ case SAFE_MODE:
+ if (tls.extent.isMeta())
+ return TabletGoalState.HOSTED;
+ return TabletGoalState.UNASSIGNED;
+ case UNLOAD_METADATA_TABLETS:
+ if (tls.extent.isRootTablet())
+ return TabletGoalState.HOSTED;
+ return TabletGoalState.UNASSIGNED;
+ case UNLOAD_ROOT_TABLET:
+ return TabletGoalState.UNASSIGNED;
+ case STOP:
+ return TabletGoalState.UNASSIGNED;
+ default:
+ throw new IllegalStateException("Unknown Master State");
+ }
+ }
+
+ TabletGoalState getTableGoalState(KeyExtent extent) {
+ TableState tableState = TableManager.getInstance().getTableState(extent.getTableId().toString());
+ if (tableState == null)
+ return TabletGoalState.DELETED;
+ switch (tableState) {
+ case DELETING:
+ return TabletGoalState.DELETED;
+ case OFFLINE:
+ case NEW:
+ return TabletGoalState.UNASSIGNED;
+ default:
+ return TabletGoalState.HOSTED;
+ }
+ }
+
+ TabletGoalState getGoalState(TabletLocationState tls, MergeInfo mergeInfo) {
+ KeyExtent extent = tls.extent;
+ // Shutting down?
+ TabletGoalState state = getSystemGoalState(tls);
+ if (state == TabletGoalState.HOSTED) {
+ if (tls.current != null && serversToShutdown.contains(tls.current)) {
+ return TabletGoalState.UNASSIGNED;
+ }
+ // Handle merge transitions
+ if (mergeInfo.getExtent() != null) {
+ log.debug("mergeInfo overlaps: " + extent + " " + mergeInfo.overlaps(extent));
+ if (mergeInfo.overlaps(extent)) {
+ switch (mergeInfo.getState()) {
+ case NONE:
+ case COMPLETE:
+ break;
+ case STARTED:
+ case SPLITTING:
+ return TabletGoalState.HOSTED;
+ case WAITING_FOR_CHOPPED:
+ if (tls.getState(onlineTabletServers()).equals(TabletState.HOSTED)) {
+ if (tls.chopped)
+ return TabletGoalState.UNASSIGNED;
+ } else {
+ if (tls.chopped && tls.walogs.isEmpty())
+ return TabletGoalState.UNASSIGNED;
+ }
+
+ return TabletGoalState.HOSTED;
+ case WAITING_FOR_OFFLINE:
+ case MERGING:
+ return TabletGoalState.UNASSIGNED;
+ }
+ }
+ }
+
+ // taking table offline?
+ state = getTableGoalState(extent);
+ if (state == TabletGoalState.HOSTED) {
+ // Maybe this tablet needs to be migrated
+ TServerInstance dest = migrations.get(extent);
+ if (dest != null && tls.current != null && !dest.equals(tls.current)) {
+ return TabletGoalState.UNASSIGNED;
+ }
+ }
+ }
+ return state;
+ }
+
+ private class MigrationCleanupThread extends Daemon {
+
+ @Override
+ public void run() {
+ setName("Migration Cleanup Thread");
+ while (stillMaster()) {
+ if (!migrations.isEmpty()) {
+ try {
+ cleanupMutations();
+ } catch (Exception ex) {
+ log.error("Error cleaning up migrations", ex);
+ }
+ }
+ UtilWaitThread.sleep(TIME_BETWEEN_MIGRATION_CLEANUPS);
+ }
+ }
+
+ // If a migrating tablet splits, and the tablet dies before sending the
+ // master a message, the migration will refer to a non-existing tablet,
+ // so it can never complete. Periodically scan the metadata table and
+ // remove any migrating tablets that no longer exist.
+ private void cleanupMutations() throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
+ Connector connector = getConnector();
+ Scanner scanner = connector.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+ TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.fetch(scanner);
+ Set<KeyExtent> found = new HashSet<KeyExtent>();
+ for (Entry<Key,Value> entry : scanner) {
+ KeyExtent extent = new KeyExtent(entry.getKey().getRow(), entry.getValue());
+ if (migrations.containsKey(extent)) {
+ found.add(extent);
+ }
+ }
+ migrations.keySet().retainAll(found);
+ }
+ }
+
+ private class StatusThread extends Daemon {
+
+ @Override
+ public void run() {
+ setName("Status Thread");
+ EventCoordinator.Listener eventListener = nextEvent.getListener();
+ while (stillMaster()) {
+ long wait = DEFAULT_WAIT_FOR_WATCHER;
+ try {
+ switch (getMasterGoalState()) {
+ case NORMAL:
+ setMasterState(MasterState.NORMAL);
+ break;
+ case SAFE_MODE:
+ if (getMasterState() == MasterState.NORMAL) {
+ setMasterState(MasterState.SAFE_MODE);
+ }
+ if (getMasterState() == MasterState.HAVE_LOCK) {
+ setMasterState(MasterState.SAFE_MODE);
+ }
+ break;
+ case CLEAN_STOP:
+ switch (getMasterState()) {
+ case NORMAL:
+ setMasterState(MasterState.SAFE_MODE);
+ break;
+ case SAFE_MODE: {
+ int count = nonMetaDataTabletsAssignedOrHosted();
+ log.debug(String.format("There are %d non-metadata tablets assigned or hosted", count));
+ if (count == 0)
+ setMasterState(MasterState.UNLOAD_METADATA_TABLETS);
+ }
+ break;
+ case UNLOAD_METADATA_TABLETS: {
+ int count = assignedOrHosted(METADATA_TABLE_ID);
+ log.debug(String.format("There are %d metadata tablets assigned or hosted", count));
+ if (count == 0)
+ setMasterState(MasterState.UNLOAD_ROOT_TABLET);
+ }
+ break;
+ case UNLOAD_ROOT_TABLET: {
+ int count = assignedOrHosted(METADATA_TABLE_ID);
+ if (count > 0) {
+ log.debug(String.format("%d metadata tablets online", count));
+ setMasterState(MasterState.UNLOAD_ROOT_TABLET);
+ }
+ int root_count = assignedOrHosted(ROOT_TABLE_ID);
+ if (root_count > 0)
+ log.debug("The root tablet is still assigned or hosted");
+ if (count + root_count == 0) {
+ Set<TServerInstance> currentServers = tserverSet.getCurrentServers();
+ log.debug("stopping " + currentServers.size() + " tablet servers");
+ for (TServerInstance server : currentServers) {
+ try {
+ serversToShutdown.add(server);
+ tserverSet.getConnection(server).fastHalt(masterLock);
+ } catch (TException e) {
+ // its probably down, and we don't care
+ } finally {
+ tserverSet.remove(server);
+ }
+ }
+ if (currentServers.size() == 0)
+ setMasterState(MasterState.STOP);
+ }
+ }
+ break;
+ default:
+ break;
+ }
+ }
+ wait = updateStatus();
+ eventListener.waitForEvents(wait);
+ } catch (Throwable t) {
+ log.error("Error balancing tablets", t);
+ UtilWaitThread.sleep(WAIT_BETWEEN_ERRORS);
+ }
+ }
+ }
+
+ private long updateStatus() throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
+ tserverStatus = Collections.synchronizedSortedMap(gatherTableInformation());
+ checkForHeldServer(tserverStatus);
+
+ if (!badServers.isEmpty()) {
+ log.debug("not balancing because the balance information is out-of-date " + badServers.keySet());
+ } else if (notHosted() > 0) {
+ log.debug("not balancing because there are unhosted tablets");
+ } else if (getMasterGoalState() == MasterGoalState.CLEAN_STOP) {
+ log.debug("not balancing because the master is attempting to stop cleanly");
+ } else if (!serversToShutdown.isEmpty()) {
+ log.debug("not balancing while shutting down servers " + serversToShutdown);
+ } else {
+ return balanceTablets();
+ }
+ return DEFAULT_WAIT_FOR_WATCHER;
+ }
+
+ private void checkForHeldServer(SortedMap<TServerInstance,TabletServerStatus> tserverStatus) {
+ TServerInstance instance = null;
+ int crazyHoldTime = 0;
+ int someHoldTime = 0;
+ final long maxWait = getSystemConfiguration().getTimeInMillis(Property.TSERV_HOLD_TIME_SUICIDE);
+ for (Entry<TServerInstance,TabletServerStatus> entry : tserverStatus.entrySet()) {
+ if (entry.getValue().getHoldTime() > 0) {
+ someHoldTime++;
+ if (entry.getValue().getHoldTime() > maxWait) {
+ instance = entry.getKey();
+ crazyHoldTime++;
+ }
+ }
+ }
+ if (crazyHoldTime == 1 && someHoldTime == 1 && tserverStatus.size() > 1) {
+ log.warn("Tablet server " + instance + " exceeded maximum hold time: attempting to kill it");
+ try {
+ TServerConnection connection = tserverSet.getConnection(instance);
+ if (connection != null)
+ connection.fastHalt(masterLock);
+ } catch (TException e) {
+ log.error(e, e);
+ }
+ tserverSet.remove(instance);
+ }
+ }
+
+ private long balanceTablets() {
+ List<TabletMigration> migrationsOut = new ArrayList<TabletMigration>();
+ Set<KeyExtent> migrationsCopy = new HashSet<KeyExtent>();
+ synchronized (migrations) {
+ migrationsCopy.addAll(migrations.keySet());
+ }
+ long wait = tabletBalancer.balance(Collections.unmodifiableSortedMap(tserverStatus), Collections.unmodifiableSet(migrationsCopy), migrationsOut);
+
+ for (TabletMigration m : TabletBalancer.checkMigrationSanity(tserverStatus.keySet(), migrationsOut)) {
+ if (migrations.containsKey(m.tablet)) {
+ log.warn("balancer requested migration more than once, skipping " + m);
+ continue;
+ }
+ migrations.put(m.tablet, m.newServer);
+ log.debug("migration " + m);
+ }
+ if (migrationsOut.size() > 0) {
+ nextEvent.event("Migrating %d more tablets, %d total", migrationsOut.size(), migrations.size());
+ }
+ return wait;
+ }
+
+ }
+
+ private SortedMap<TServerInstance,TabletServerStatus> gatherTableInformation() {
+ long start = System.currentTimeMillis();
+ SortedMap<TServerInstance,TabletServerStatus> result = new TreeMap<TServerInstance,TabletServerStatus>();
+ Set<TServerInstance> currentServers = tserverSet.getCurrentServers();
+ for (TServerInstance server : currentServers) {
+ try {
+ Thread t = Thread.currentThread();
+ String oldName = t.getName();
+ try {
+ t.setName("Getting status from " + server);
+ TServerConnection connection = tserverSet.getConnection(server);
+ if (connection == null)
+ throw new IOException("No connection to " + server);
+ TabletServerStatus status = connection.getTableMap(false);
+ result.put(server, status);
+ } finally {
+ t.setName(oldName);
+ }
+ } catch (Exception ex) {
+ log.error("unable to get tablet server status " + server + " " + ex.toString());
+ log.debug("unable to get tablet server status " + server, ex);
+ if (badServers.get(server).incrementAndGet() > MAX_BAD_STATUS_COUNT) {
+ log.warn("attempting to stop " + server);
+ try {
+ TServerConnection connection = tserverSet.getConnection(server);
+ if (connection != null)
+ connection.halt(masterLock);
+ } catch (TTransportException e) {
+ // ignore: it's probably down
+ } catch (Exception e) {
+ log.info("error talking to troublesome tablet server ", e);
+ }
+ badServers.remove(server);
+ tserverSet.remove(server);
+ }
+ }
+ }
+ synchronized (badServers) {
+ badServers.keySet().retainAll(currentServers);
+ badServers.keySet().removeAll(result.keySet());
+ }
+ log.debug(String.format("Finished gathering information from %d servers in %.2f seconds", result.size(), (System.currentTimeMillis() - start) / 1000.));
+ return result;
+ }
+
+ public void run() throws IOException, InterruptedException, KeeperException {
+ final String zroot = ZooUtil.getRoot(instance);
+
+ getMasterLock(zroot + Constants.ZMASTER_LOCK);
+
+ recoveryManager = new RecoveryManager(this);
+
+ TableManager.getInstance().addObserver(this);
+
+ StatusThread statusThread = new StatusThread();
+ statusThread.start();
+
+ MigrationCleanupThread migrationCleanupThread = new MigrationCleanupThread();
+ migrationCleanupThread.start();
+
+ tserverSet.startListeningForTabletServerChanges();
+
+ try {
+ final AgeOffStore<Master> store = new AgeOffStore<Master>(new org.apache.accumulo.fate.ZooStore<Master>(ZooUtil.getRoot(instance) + Constants.ZFATE,
+ ZooReaderWriter.getRetryingInstance()), 1000 * 60 * 60 * 8);
+
+ int threads = this.getConfiguration().getConfiguration().getCount(Property.MASTER_FATE_THREADPOOL_SIZE);
+
+ fate = new Fate<Master>(this, store, threads);
+
+ SimpleTimer.getInstance().schedule(new Runnable() {
+
+ @Override
+ public void run() {
+ store.ageOff();
+ }
+ }, 63000, 63000);
+ } catch (KeeperException e) {
+ throw new IOException(e);
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ }
+
+ ZooReaderWriter.getInstance().getChildren(zroot + Constants.ZRECOVERY, new Watcher() {
+ @Override
+ public void process(WatchedEvent event) {
+ nextEvent.event("Noticed recovery changes", event.getType());
+ try {
+ // watcher only fires once, add it back
+ ZooReaderWriter.getInstance().getChildren(zroot + Constants.ZRECOVERY, this);
+ } catch (Exception e) {
+ log.error("Failed to add log recovery watcher back", e);
+ }
+ }
+ });
+
+ Credentials systemCreds = SystemCredentials.get();
+ watchers.add(new TabletGroupWatcher(this, new MetaDataStateStore(instance, systemCreds, this), null));
+ watchers.add(new TabletGroupWatcher(this, new RootTabletStateStore(instance, systemCreds, this), watchers.get(0)));
+ watchers.add(new TabletGroupWatcher(this, new ZooTabletStateStore(new ZooStore(zroot)), watchers.get(1)));
+ for (TabletGroupWatcher watcher : watchers) {
+ watcher.start();
+ }
+
+ Processor<Iface> processor = new Processor<Iface>(TraceWrap.service(new MasterClientServiceHandler(this)));
+ ServerAddress sa = TServerUtils.startServer(getSystemConfiguration(), hostname, Property.MASTER_CLIENTPORT, processor, "Master",
+ "Master Client Service Handler", null, Property.MASTER_MINTHREADS, Property.MASTER_THREADCHECK, Property.GENERAL_MAX_MESSAGE_SIZE);
+ clientService = sa.server;
+ String address = sa.address.toString();
+ log.info("Setting master lock data to " + address);
+ masterLock.replaceLockData(address.getBytes());
+
+ while (!clientService.isServing()) {
+ UtilWaitThread.sleep(100);
+ }
+ while (clientService.isServing()) {
+ UtilWaitThread.sleep(500);
+ }
+ log.info("Shutting down fate.");
+ fate.shutdown();
+
+ final long deadline = System.currentTimeMillis() + MAX_CLEANUP_WAIT_TIME;
+ statusThread.join(remaining(deadline));
+
+ // quit, even if the tablet servers somehow jam up and the watchers
+ // don't stop
+ for (TabletGroupWatcher watcher : watchers) {
+ watcher.join(remaining(deadline));
+ }
+ log.info("exiting");
+ }
+
+ private long remaining(long deadline) {
+ return Math.max(1, deadline - System.currentTimeMillis());
+ }
+
+ public ZooLock getMasterLock() {
+ return masterLock;
+ }
+
+ private static class MasterLockWatcher implements ZooLock.AsyncLockWatcher {
+
+ boolean acquiredLock = false;
+ boolean failedToAcquireLock = false;
+
+ @Override
+ public void lostLock(LockLossReason reason) {
+ Halt.halt("Master lock in zookeeper lost (reason = " + reason + "), exiting!", -1);
+ }
+
+ @Override
+ public void unableToMonitorLockNode(final Throwable e) {
+ Halt.halt(-1, new Runnable() {
+ @Override
+ public void run() {
+ log.fatal("No longer able to monitor master lock node", e);
+ }
+ });
+
+ }
+
+ @Override
+ public synchronized void acquiredLock() {
+ log.debug("Acquired master lock");
+
+ if (acquiredLock || failedToAcquireLock) {
+ Halt.halt("Zoolock in unexpected state AL " + acquiredLock + " " + failedToAcquireLock, -1);
+ }
+
+ acquiredLock = true;
+ notifyAll();
+ }
+
+ @Override
+ public synchronized void failedToAcquireLock(Exception e) {
+ log.warn("Failed to get master lock " + e);
+
+ if (acquiredLock) {
+ Halt.halt("Zoolock in unexpected state FAL " + acquiredLock + " " + failedToAcquireLock, -1);
+ }
+
+ failedToAcquireLock = true;
+ notifyAll();
+ }
+
+ public synchronized void waitForChange() {
+ while (!acquiredLock && !failedToAcquireLock) {
+ try {
+ wait();
+ } catch (InterruptedException e) {}
+ }
+ }
+ }
+
+ private void getMasterLock(final String zMasterLoc) throws KeeperException, InterruptedException {
+ log.info("trying to get master lock");
+
+ final String masterClientAddress = hostname + ":" + getSystemConfiguration().getPort(Property.MASTER_CLIENTPORT);
+
+ while (true) {
+
+ MasterLockWatcher masterLockWatcher = new MasterLockWatcher();
+ masterLock = new ZooLock(zMasterLoc);
+ masterLock.lockAsync(masterLockWatcher, masterClientAddress.getBytes());
+
+ masterLockWatcher.waitForChange();
+
+ if (masterLockWatcher.acquiredLock) {
+ break;
+ }
+
+ if (!masterLockWatcher.failedToAcquireLock) {
+ throw new IllegalStateException("master lock in unknown state");
+ }
+
+ masterLock.tryToCancelAsyncLockOrUnlock();
+
+ UtilWaitThread.sleep(TIME_TO_WAIT_BETWEEN_LOCK_CHECKS);
+ }
+
+ setMasterState(MasterState.HAVE_LOCK);
+ }
+
+ public static void main(String[] args) throws Exception {
+ try {
+ SecurityUtil.serverLogin(ServerConfiguration.getSiteConfiguration());
+
+ VolumeManager fs = VolumeManagerImpl.get();
+ ServerOpts opts = new ServerOpts();
+ opts.parseArgs("master", args);
+ String hostname = opts.getAddress();
+ Instance instance = HdfsZooInstance.getInstance();
+ ServerConfiguration conf = new ServerConfiguration(instance);
+ Accumulo.init(fs, conf, "master");
+ Master master = new Master(conf, fs, hostname);
+ Accumulo.enableTracing(hostname, "master");
+ master.run();
+ } catch (Exception ex) {
+ log.error("Unexpected exception, exiting", ex);
+ System.exit(1);
+ }
+ }
+
+ @Override
+ public void update(LiveTServerSet current, Set<TServerInstance> deleted, Set<TServerInstance> added) {
+ DeadServerList obit = new DeadServerList(ZooUtil.getRoot(instance) + Constants.ZDEADTSERVERS);
+ if (added.size() > 0) {
+ log.info("New servers: " + added);
+ for (TServerInstance up : added)
+ obit.delete(up.hostPort());
+ }
+ for (TServerInstance dead : deleted) {
+ String cause = "unexpected failure";
+ if (serversToShutdown.contains(dead))
+ cause = "clean shutdown"; // maybe an incorrect assumption
+ if (!getMasterGoalState().equals(MasterGoalState.CLEAN_STOP))
+ obit.post(dead.hostPort(), cause);
+ }
+
+ Set<TServerInstance> unexpected = new HashSet<TServerInstance>(deleted);
+ unexpected.removeAll(this.serversToShutdown);
+ if (unexpected.size() > 0) {
+ if (stillMaster() && !getMasterGoalState().equals(MasterGoalState.CLEAN_STOP)) {
+ log.warn("Lost servers " + unexpected);
+ }
+ }
+ serversToShutdown.removeAll(deleted);
+ badServers.keySet().removeAll(deleted);
+ // clear out any bad server with the same host/port as a new server
+ synchronized (badServers) {
+ cleanListByHostAndPort(badServers.keySet(), deleted, added);
+ }
+ synchronized (serversToShutdown) {
+ cleanListByHostAndPort(serversToShutdown, deleted, added);
+ }
+
+ synchronized (migrations) {
+ Iterator<Entry<KeyExtent,TServerInstance>> iter = migrations.entrySet().iterator();
+ while (iter.hasNext()) {
+ Entry<KeyExtent,TServerInstance> entry = iter.next();
+ if (deleted.contains(entry.getValue())) {
+ log.info("Canceling migration of " + entry.getKey() + " to " + entry.getValue());
+ iter.remove();
+ }
+ }
+ }
+ nextEvent.event("There are now %d tablet servers", current.size());
+ }
+
+ private static void cleanListByHostAndPort(Collection<TServerInstance> badServers, Set<TServerInstance> deleted, Set<TServerInstance> added) {
+ Iterator<TServerInstance> badIter = badServers.iterator();
+ while (badIter.hasNext()) {
+ TServerInstance bad = badIter.next();
+ for (TServerInstance add : added) {
+ if (bad.hostPort().equals(add.hostPort())) {
+ badIter.remove();
+ break;
+ }
+ }
+ for (TServerInstance del : deleted) {
+ if (bad.hostPort().equals(del.hostPort())) {
+ badIter.remove();
+ break;
+ }
+ }
+ }
+ }
+
+ @Override
+ public void stateChanged(String tableId, TableState state) {
+ nextEvent.event("Table state in zookeeper changed for %s to %s", tableId, state);
+ }
+
+ @Override
+ public void initialize(Map<String,TableState> tableIdToStateMap) {}
+
+ @Override
+ public void sessionExpired() {}
+
+ @Override
+ public Set<String> onlineTables() {
+ Set<String> result = new HashSet<String>();
+ if (getMasterState() != MasterState.NORMAL) {
+ if (getMasterState() != MasterState.UNLOAD_METADATA_TABLETS)
+ result.add(MetadataTable.ID);
+ if (getMasterState() != MasterState.UNLOAD_ROOT_TABLET)
+ result.add(RootTable.ID);
+ return result;
+ }
+ TableManager manager = TableManager.getInstance();
+
+ for (String tableId : Tables.getIdToNameMap(instance).keySet()) {
+ TableState state = manager.getTableState(tableId);
+ if (state != null) {
+ if (state == TableState.ONLINE)
+ result.add(tableId);
+ }
+ }
+ return result;
+ }
+
+ @Override
+ public Set<TServerInstance> onlineTabletServers() {
+ return tserverSet.getCurrentServers();
+ }
+
+ @Override
+ public Collection<MergeInfo> merges() {
+ List<MergeInfo> result = new ArrayList<MergeInfo>();
+ for (String tableId : Tables.getIdToNameMap(instance).keySet()) {
+ result.add(getMergeInfo(new Text(tableId)));
+ }
+ return result;
+ }
+
+ // recovers state from the persistent transaction to shutdown a server
+ public void shutdownTServer(TServerInstance server) {
+ nextEvent.event("Tablet Server shutdown requested for %s", server);
+ serversToShutdown.add(server);
+ }
+
+ public EventCoordinator getEventCoordinator() {
+ return nextEvent;
+ }
+
+ public Instance getInstance() {
+ return this.instance;
+ }
+
+ public AccumuloConfiguration getSystemConfiguration() {
+ return serverConfig.getConfiguration();
+ }
+
+ public ServerConfiguration getConfiguration() {
+ return serverConfig;
+ }
+
+ public VolumeManager getFileSystem() {
+ return this.fs;
+ }
+
+ public void assignedTablet(KeyExtent extent) {
+ if (extent.isMeta()) {
+ if (getMasterState().equals(MasterState.UNLOAD_ROOT_TABLET)) {
+ setMasterState(MasterState.UNLOAD_METADATA_TABLETS);
+ }
+ }
+ if (extent.isRootTablet()) {
+ // probably too late, but try anyhow
+ if (getMasterState().equals(MasterState.STOP)) {
+ setMasterState(MasterState.UNLOAD_ROOT_TABLET);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/1392e07f/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java
----------------------------------------------------------------------
diff --cc server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java
index 1590348,0000000..cc52e45
mode 100644,000000..100644
--- a/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java
@@@ -1,742 -1,0 +1,747 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.master;
+
+import static java.lang.Math.min;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.RowIterator;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.PartialKey;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.master.thrift.TabletServerStatus;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.RootTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ChoppedColumnFamily;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.CurrentLocationColumnFamily;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.FutureLocationColumnFamily;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException;
+import org.apache.accumulo.core.util.Daemon;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.master.Master.TabletGoalState;
+import org.apache.accumulo.master.state.MergeStats;
+import org.apache.accumulo.master.state.TableCounts;
+import org.apache.accumulo.master.state.TableStats;
+import org.apache.accumulo.server.ServerConstants;
+import org.apache.accumulo.server.fs.FileRef;
+import org.apache.accumulo.server.fs.VolumeManager.FileType;
+import org.apache.accumulo.server.master.LiveTServerSet.TServerConnection;
+import org.apache.accumulo.server.master.state.Assignment;
+import org.apache.accumulo.server.master.state.ClosableIterator;
+import org.apache.accumulo.server.master.state.DistributedStoreException;
+import org.apache.accumulo.server.master.state.MergeInfo;
+import org.apache.accumulo.server.master.state.MergeState;
+import org.apache.accumulo.server.master.state.TServerInstance;
+import org.apache.accumulo.server.master.state.TabletLocationState;
+import org.apache.accumulo.server.master.state.TabletLocationState.BadLocationStateException;
+import org.apache.accumulo.server.master.state.TabletState;
+import org.apache.accumulo.server.master.state.TabletStateStore;
+import org.apache.accumulo.server.security.SystemCredentials;
+import org.apache.accumulo.server.tables.TableManager;
+import org.apache.accumulo.server.tablets.TabletTime;
+import org.apache.accumulo.server.util.MetadataTableUtil;
+import org.apache.hadoop.io.Text;
+import org.apache.thrift.TException;
+
+import com.google.common.collect.Iterators;
+
+class TabletGroupWatcher extends Daemon {
+
+ private final Master master;
+ final TabletStateStore store;
+ final TabletGroupWatcher dependentWatcher;
+
+ final TableStats stats = new TableStats();
+
+ TabletGroupWatcher(Master master, TabletStateStore store, TabletGroupWatcher dependentWatcher) {
+ this.master = master;
+ this.store = store;
+ this.dependentWatcher = dependentWatcher;
+ }
+
+ Map<Text,TableCounts> getStats() {
+ return stats.getLast();
+ }
+
+ TableCounts getStats(Text tableId) {
+ return stats.getLast(tableId);
+ }
+
+ @Override
+ public void run() {
+
+ Thread.currentThread().setName("Watching " + store.name());
+ int[] oldCounts = new int[TabletState.values().length];
+ EventCoordinator.Listener eventListener = this.master.nextEvent.getListener();
+
+ while (this.master.stillMaster()) {
+ // slow things down a little, otherwise we spam the logs when there are many wake-up events
+ UtilWaitThread.sleep(100);
+
+ int totalUnloaded = 0;
+ int unloaded = 0;
+ ClosableIterator<TabletLocationState> iter = null;
+ try {
+ Map<Text,MergeStats> mergeStatsCache = new HashMap<Text,MergeStats>();
++ for (MergeInfo merge : master.merges()) {
++ if (merge.getExtent() != null) {
++ mergeStatsCache.put(merge.getExtent().getTableId(), new MergeStats(merge));
++ }
++ }
+
+ // Get the current status for the current list of tservers
+ SortedMap<TServerInstance,TabletServerStatus> currentTServers = new TreeMap<TServerInstance,TabletServerStatus>();
+ for (TServerInstance entry : this.master.tserverSet.getCurrentServers()) {
+ currentTServers.put(entry, this.master.tserverStatus.get(entry));
+ }
+
+ if (currentTServers.size() == 0) {
+ eventListener.waitForEvents(Master.TIME_TO_WAIT_BETWEEN_SCANS);
+ continue;
+ }
+
+ // Don't move tablets to servers that are shutting down
+ SortedMap<TServerInstance,TabletServerStatus> destinations = new TreeMap<TServerInstance,TabletServerStatus>(currentTServers);
+ destinations.keySet().removeAll(this.master.serversToShutdown);
+
+ List<Assignment> assignments = new ArrayList<Assignment>();
+ List<Assignment> assigned = new ArrayList<Assignment>();
+ List<TabletLocationState> assignedToDeadServers = new ArrayList<TabletLocationState>();
+ Map<KeyExtent,TServerInstance> unassigned = new HashMap<KeyExtent,TServerInstance>();
+
+ int[] counts = new int[TabletState.values().length];
+ stats.begin();
+ // Walk through the tablets in our store, and work tablets
+ // towards their goal
+ iter = store.iterator();
+ while (iter.hasNext()) {
+ TabletLocationState tls = iter.next();
+ if (tls == null) {
+ continue;
+ }
+ // ignore entries for tables that do not exist in zookeeper
+ if (TableManager.getInstance().getTableState(tls.extent.getTableId().toString()) == null)
+ continue;
+
+ if (Master.log.isTraceEnabled())
+ Master.log.trace(tls + " walogs " + tls.walogs.size());
+
+ // Don't overwhelm the tablet servers with work
+ if (unassigned.size() + unloaded > Master.MAX_TSERVER_WORK_CHUNK * currentTServers.size()) {
+ flushChanges(destinations, assignments, assigned, assignedToDeadServers, unassigned);
+ assignments.clear();
+ assigned.clear();
+ assignedToDeadServers.clear();
+ unassigned.clear();
+ unloaded = 0;
+ eventListener.waitForEvents(Master.TIME_TO_WAIT_BETWEEN_SCANS);
+ }
+ Text tableId = tls.extent.getTableId();
+ MergeStats mergeStats = mergeStatsCache.get(tableId);
+ if (mergeStats == null) {
- mergeStatsCache.put(tableId, mergeStats = new MergeStats(this.master.getMergeInfo(tls.extent)));
++ mergeStatsCache.put(tableId, mergeStats = new MergeStats(new MergeInfo()));
+ }
+ TabletGoalState goal = this.master.getGoalState(tls, mergeStats.getMergeInfo());
+ TServerInstance server = tls.getServer();
+ TabletState state = tls.getState(currentTServers.keySet());
+ if (Master.log.isTraceEnabled())
+ Master.log.trace("Goal state " + goal + " current " + state);
+ stats.update(tableId, state);
+ mergeStats.update(tls.extent, state, tls.chopped, !tls.walogs.isEmpty());
+ sendChopRequest(mergeStats.getMergeInfo(), state, tls);
+ sendSplitRequest(mergeStats.getMergeInfo(), state, tls);
+
+ // Always follow through with assignments
+ if (state == TabletState.ASSIGNED) {
+ goal = TabletGoalState.HOSTED;
+ }
+
+ // if we are shutting down all the tabletservers, we have to do it in order
+ if (goal == TabletGoalState.UNASSIGNED && state == TabletState.HOSTED) {
+ if (this.master.serversToShutdown.equals(currentTServers.keySet())) {
+ if (dependentWatcher != null && dependentWatcher.assignedOrHosted() > 0) {
+ goal = TabletGoalState.HOSTED;
+ }
+ }
+ }
+
+ if (goal == TabletGoalState.HOSTED) {
+ if (state != TabletState.HOSTED && !tls.walogs.isEmpty()) {
+ if (this.master.recoveryManager.recoverLogs(tls.extent, tls.walogs))
+ continue;
+ }
+ switch (state) {
+ case HOSTED:
+ if (server.equals(this.master.migrations.get(tls.extent)))
+ this.master.migrations.remove(tls.extent);
+ break;
+ case ASSIGNED_TO_DEAD_SERVER:
+ assignedToDeadServers.add(tls);
+ if (server.equals(this.master.migrations.get(tls.extent)))
+ this.master.migrations.remove(tls.extent);
+ // log.info("Current servers " + currentTServers.keySet());
+ break;
+ case UNASSIGNED:
+ // maybe it's a finishing migration
+ TServerInstance dest = this.master.migrations.get(tls.extent);
+ if (dest != null) {
+ // if destination is still good, assign it
+ if (destinations.keySet().contains(dest)) {
+ assignments.add(new Assignment(tls.extent, dest));
+ } else {
+ // get rid of this migration
+ this.master.migrations.remove(tls.extent);
+ unassigned.put(tls.extent, server);
+ }
+ } else {
+ unassigned.put(tls.extent, server);
+ }
+ break;
+ case ASSIGNED:
+ // Send another reminder
+ assigned.add(new Assignment(tls.extent, tls.future));
+ break;
+ }
+ } else {
+ switch (state) {
+ case UNASSIGNED:
+ break;
+ case ASSIGNED_TO_DEAD_SERVER:
+ assignedToDeadServers.add(tls);
+ // log.info("Current servers " + currentTServers.keySet());
+ break;
+ case HOSTED:
+ TServerConnection conn = this.master.tserverSet.getConnection(server);
+ if (conn != null) {
+ conn.unloadTablet(this.master.masterLock, tls.extent, goal != TabletGoalState.DELETED);
+ unloaded++;
+ totalUnloaded++;
+ } else {
+ Master.log.warn("Could not connect to server " + server);
+ }
+ break;
+ case ASSIGNED:
+ break;
+ }
+ }
+ counts[state.ordinal()]++;
+ }
+
+ flushChanges(destinations, assignments, assigned, assignedToDeadServers, unassigned);
+
+ // provide stats after flushing changes to avoid race conditions w/ delete table
+ stats.end();
+
+ // Report changes
+ for (TabletState state : TabletState.values()) {
+ int i = state.ordinal();
+ if (counts[i] > 0 && counts[i] != oldCounts[i]) {
+ this.master.nextEvent.event("[%s]: %d tablets are %s", store.name(), counts[i], state.name());
+ }
+ }
+ Master.log.debug(String.format("[%s]: scan time %.2f seconds", store.name(), stats.getScanTime() / 1000.));
+ oldCounts = counts;
+ if (totalUnloaded > 0) {
+ this.master.nextEvent.event("[%s]: %d tablets unloaded", store.name(), totalUnloaded);
+ }
+
+ updateMergeState(mergeStatsCache);
+
+ Master.log.debug(String.format("[%s] sleeping for %.2f seconds", store.name(), Master.TIME_TO_WAIT_BETWEEN_SCANS / 1000.));
+ eventListener.waitForEvents(Master.TIME_TO_WAIT_BETWEEN_SCANS);
+ } catch (Exception ex) {
+ Master.log.error("Error processing table state for store " + store.name(), ex);
+ if (ex.getCause() != null && ex.getCause() instanceof BadLocationStateException) {
+ repairMetadata(((BadLocationStateException) ex.getCause()).getEncodedEndRow());
+ } else {
+ UtilWaitThread.sleep(Master.WAIT_BETWEEN_ERRORS);
+ }
+ } finally {
+ if (iter != null) {
+ try {
+ iter.close();
+ } catch (IOException ex) {
+ Master.log.warn("Error closing TabletLocationState iterator: " + ex, ex);
+ }
+ }
+ }
+ }
+ }
+
+ private void repairMetadata(Text row) {
+ Master.log.debug("Attempting repair on " + row);
+ // ACCUMULO-2261 if a dying tserver writes a location before its lock information propagates, it may cause duplicate assignment.
+ // Attempt to find the dead server entry and remove it.
+ try {
+ Map<Key, Value> future = new HashMap<Key, Value>();
+ Map<Key, Value> assigned = new HashMap<Key, Value>();
+ KeyExtent extent = new KeyExtent(row, new Value(new byte[]{0}));
+ String table = MetadataTable.NAME;
+ if (extent.isMeta())
+ table = RootTable.NAME;
+ Scanner scanner = this.master.getConnector().createScanner(table, Authorizations.EMPTY);
+ scanner.fetchColumnFamily(CurrentLocationColumnFamily.NAME);
+ scanner.fetchColumnFamily(FutureLocationColumnFamily.NAME);
+ scanner.setRange(new Range(row));
+ for (Entry<Key,Value> entry : scanner) {
+ if (entry.getKey().getColumnFamily().equals(CurrentLocationColumnFamily.NAME)) {
+ assigned.put(entry.getKey(), entry.getValue());
+ } else if (entry.getKey().getColumnFamily().equals(FutureLocationColumnFamily.NAME)) {
+ future.put(entry.getKey(), entry.getValue());
+ }
+ }
+ if (future.size() > 0 && assigned.size() > 0) {
+ Master.log.warn("Found a tablet assigned and hosted, attempting to repair");
+ } else if (future.size() > 1 && assigned.size() == 0) {
+ Master.log.warn("Found a tablet assigned to multiple servers, attempting to repair");
+ } else if (future.size() == 0 && assigned.size() > 1) {
+ Master.log.warn("Found a tablet hosted on multiple servers, attempting to repair");
+ } else {
+ Master.log.info("Attempted a repair, but nothing seems to be obviously wrong. " + assigned + " " + future);
+ return;
+ }
+ Iterator<Entry<Key, Value>> iter = Iterators.concat(future.entrySet().iterator(), assigned.entrySet().iterator());
+ while (iter.hasNext()) {
+ Entry<Key, Value> entry = iter.next();
+ TServerInstance alive = master.tserverSet.find(entry.getValue().toString());
+ if (alive == null) {
+ Master.log.info("Removing entry " + entry);
+ BatchWriter bw = this.master.getConnector().createBatchWriter(table, new BatchWriterConfig());
+ Mutation m = new Mutation(entry.getKey().getRow());
+ m.putDelete(entry.getKey().getColumnFamily(), entry.getKey().getColumnQualifier());
+ bw.addMutation(m);
+ bw.close();
+ return;
+ }
+ }
+ Master.log.error("Metadata table is inconsistent at " + row + " and all assigned/future tservers are still online.");
+ } catch (Throwable e) {
+ Master.log.error("Error attempting repair of metadata " + row + ": " + e, e);
+ }
+ }
+
+ private int assignedOrHosted() {
+ int result = 0;
+ for (TableCounts counts : stats.getLast().values()) {
+ result += counts.assigned() + counts.hosted();
+ }
+ return result;
+ }
+
+ private void sendSplitRequest(MergeInfo info, TabletState state, TabletLocationState tls) {
+ // Already split?
+ if (!info.getState().equals(MergeState.SPLITTING))
+ return;
+ // Merges don't split
+ if (!info.isDelete())
+ return;
+ // Online and ready to split?
+ if (!state.equals(TabletState.HOSTED))
+ return;
+ // Does this extent cover the end points of the delete?
+ KeyExtent range = info.getExtent();
+ if (tls.extent.overlaps(range)) {
+ for (Text splitPoint : new Text[] {range.getPrevEndRow(), range.getEndRow()}) {
+ if (splitPoint == null)
+ continue;
+ if (!tls.extent.contains(splitPoint))
+ continue;
+ if (splitPoint.equals(tls.extent.getEndRow()))
+ continue;
+ if (splitPoint.equals(tls.extent.getPrevEndRow()))
+ continue;
+ try {
+ TServerConnection conn;
+ conn = this.master.tserverSet.getConnection(tls.current);
+ if (conn != null) {
+ Master.log.info("Asking " + tls.current + " to split " + tls.extent + " at " + splitPoint);
+ conn.splitTablet(this.master.masterLock, tls.extent, splitPoint);
+ } else {
+ Master.log.warn("Not connected to server " + tls.current);
+ }
+ } catch (NotServingTabletException e) {
+ Master.log.debug("Error asking tablet server to split a tablet: " + e);
+ } catch (Exception e) {
+ Master.log.warn("Error asking tablet server to split a tablet: " + e);
+ }
+ }
+ }
+ }
+
+ private void sendChopRequest(MergeInfo info, TabletState state, TabletLocationState tls) {
+ // Don't bother if we're in the wrong state
+ if (!info.getState().equals(MergeState.WAITING_FOR_CHOPPED))
+ return;
+ // Tablet must be online
+ if (!state.equals(TabletState.HOSTED))
+ return;
+ // Tablet isn't already chopped
+ if (tls.chopped)
+ return;
+ // Tablet ranges intersect
+ if (info.needsToBeChopped(tls.extent)) {
+ TServerConnection conn;
+ try {
+ conn = this.master.tserverSet.getConnection(tls.current);
+ if (conn != null) {
+ Master.log.info("Asking " + tls.current + " to chop " + tls.extent);
+ conn.chop(this.master.masterLock, tls.extent);
+ } else {
+ Master.log.warn("Could not connect to server " + tls.current);
+ }
+ } catch (TException e) {
+ Master.log.warn("Communications error asking tablet server to chop a tablet");
+ }
+ }
+ }
+
+ private void updateMergeState(Map<Text,MergeStats> mergeStatsCache) {
+ for (MergeStats stats : mergeStatsCache.values()) {
+ try {
+ MergeState update = stats.nextMergeState(this.master.getConnector(), this.master);
+ // when next state is MERGING, its important to persist this before
+ // starting the merge... the verification check that is done before
+ // moving into the merging state could fail if merge starts but does
+ // not finish
+ if (update == MergeState.COMPLETE)
+ update = MergeState.NONE;
+ if (update != stats.getMergeInfo().getState()) {
+ this.master.setMergeState(stats.getMergeInfo(), update);
+ }
+
+ if (update == MergeState.MERGING) {
+ try {
+ if (stats.getMergeInfo().isDelete()) {
+ deleteTablets(stats.getMergeInfo());
+ } else {
+ mergeMetadataRecords(stats.getMergeInfo());
+ }
+ this.master.setMergeState(stats.getMergeInfo(), update = MergeState.COMPLETE);
+ } catch (Exception ex) {
+ Master.log.error("Unable merge metadata table records", ex);
+ }
+ }
+ } catch (Exception ex) {
+ Master.log.error("Unable to update merge state for merge " + stats.getMergeInfo().getExtent(), ex);
+ }
+ }
+ }
+
+ private void deleteTablets(MergeInfo info) throws AccumuloException {
+ KeyExtent extent = info.getExtent();
+ String targetSystemTable = extent.isMeta() ? RootTable.NAME : MetadataTable.NAME;
+ Master.log.debug("Deleting tablets for " + extent);
+ char timeType = '\0';
+ KeyExtent followingTablet = null;
+ if (extent.getEndRow() != null) {
+ Key nextExtent = new Key(extent.getEndRow()).followingKey(PartialKey.ROW);
+ followingTablet = getHighTablet(new KeyExtent(extent.getTableId(), nextExtent.getRow(), extent.getEndRow()));
+ Master.log.debug("Found following tablet " + followingTablet);
+ }
+ try {
+ Connector conn = this.master.getConnector();
+ Text start = extent.getPrevEndRow();
+ if (start == null) {
+ start = new Text();
+ }
+ Master.log.debug("Making file deletion entries for " + extent);
+ Range deleteRange = new Range(KeyExtent.getMetadataEntry(extent.getTableId(), start), false, KeyExtent.getMetadataEntry(extent.getTableId(),
+ extent.getEndRow()), true);
+ Scanner scanner = conn.createScanner(targetSystemTable, Authorizations.EMPTY);
+ scanner.setRange(deleteRange);
+ TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.fetch(scanner);
+ TabletsSection.ServerColumnFamily.TIME_COLUMN.fetch(scanner);
+ scanner.fetchColumnFamily(DataFileColumnFamily.NAME);
+ scanner.fetchColumnFamily(TabletsSection.CurrentLocationColumnFamily.NAME);
+ Set<FileRef> datafiles = new TreeSet<FileRef>();
+ for (Entry<Key,Value> entry : scanner) {
+ Key key = entry.getKey();
+ if (key.compareColumnFamily(DataFileColumnFamily.NAME) == 0) {
+ datafiles.add(new FileRef(this.master.fs, key));
+ if (datafiles.size() > 1000) {
+ MetadataTableUtil.addDeleteEntries(extent, datafiles, SystemCredentials.get());
+ datafiles.clear();
+ }
+ } else if (TabletsSection.ServerColumnFamily.TIME_COLUMN.hasColumns(key)) {
+ timeType = entry.getValue().toString().charAt(0);
+ } else if (key.compareColumnFamily(TabletsSection.CurrentLocationColumnFamily.NAME) == 0) {
+ throw new IllegalStateException("Tablet " + key.getRow() + " is assigned during a merge!");
+ } else if (TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.hasColumns(key)) {
+ datafiles.add(new FileRef(entry.getValue().toString(), this.master.fs.getFullPath(FileType.TABLE, entry.getValue().toString())));
+ if (datafiles.size() > 1000) {
+ MetadataTableUtil.addDeleteEntries(extent, datafiles, SystemCredentials.get());
+ datafiles.clear();
+ }
+ }
+ }
+ MetadataTableUtil.addDeleteEntries(extent, datafiles, SystemCredentials.get());
+ BatchWriter bw = conn.createBatchWriter(targetSystemTable, new BatchWriterConfig());
+ try {
+ deleteTablets(info, deleteRange, bw, conn);
+ } finally {
+ bw.close();
+ }
+
+ if (followingTablet != null) {
+ Master.log.debug("Updating prevRow of " + followingTablet + " to " + extent.getPrevEndRow());
+ bw = conn.createBatchWriter(targetSystemTable, new BatchWriterConfig());
+ try {
+ Mutation m = new Mutation(followingTablet.getMetadataEntry());
+ TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.put(m, KeyExtent.encodePrevEndRow(extent.getPrevEndRow()));
+ ChoppedColumnFamily.CHOPPED_COLUMN.putDelete(m);
+ bw.addMutation(m);
+ bw.flush();
+ } finally {
+ bw.close();
+ }
+ } else {
+ // Recreate the default tablet to hold the end of the table
+ Master.log.debug("Recreating the last tablet to point to " + extent.getPrevEndRow());
+ String tdir = master.getFileSystem().choose(ServerConstants.getTablesDirs()) + "/" + extent.getTableId() + Constants.DEFAULT_TABLET_LOCATION;
+ MetadataTableUtil.addTablet(new KeyExtent(extent.getTableId(), null, extent.getPrevEndRow()), tdir,
+ SystemCredentials.get(), timeType, this.master.masterLock);
+ }
+ } catch (Exception ex) {
+ throw new AccumuloException(ex);
+ }
+ }
+
+ private void mergeMetadataRecords(MergeInfo info) throws AccumuloException {
+ KeyExtent range = info.getExtent();
+ Master.log.debug("Merging metadata for " + range);
+ KeyExtent stop = getHighTablet(range);
+ Master.log.debug("Highest tablet is " + stop);
+ Value firstPrevRowValue = null;
+ Text stopRow = stop.getMetadataEntry();
+ Text start = range.getPrevEndRow();
+ if (start == null) {
+ start = new Text();
+ }
+ Range scanRange = new Range(KeyExtent.getMetadataEntry(range.getTableId(), start), false, stopRow, false);
+ String targetSystemTable = MetadataTable.NAME;
+ if (range.isMeta()) {
+ targetSystemTable = RootTable.NAME;
+ }
+
+ BatchWriter bw = null;
+ try {
+ long fileCount = 0;
+ Connector conn = this.master.getConnector();
+ // Make file entries in highest tablet
+ bw = conn.createBatchWriter(targetSystemTable, new BatchWriterConfig());
+ Scanner scanner = conn.createScanner(targetSystemTable, Authorizations.EMPTY);
+ scanner.setRange(scanRange);
+ TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.fetch(scanner);
+ TabletsSection.ServerColumnFamily.TIME_COLUMN.fetch(scanner);
+ TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.fetch(scanner);
+ scanner.fetchColumnFamily(DataFileColumnFamily.NAME);
+ Mutation m = new Mutation(stopRow);
+ String maxLogicalTime = null;
+ for (Entry<Key,Value> entry : scanner) {
+ Key key = entry.getKey();
+ Value value = entry.getValue();
+ if (key.getColumnFamily().equals(DataFileColumnFamily.NAME)) {
+ m.put(key.getColumnFamily(), key.getColumnQualifier(), value);
+ fileCount++;
+ } else if (TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.hasColumns(key) && firstPrevRowValue == null) {
+ Master.log.debug("prevRow entry for lowest tablet is " + value);
+ firstPrevRowValue = new Value(value);
+ } else if (TabletsSection.ServerColumnFamily.TIME_COLUMN.hasColumns(key)) {
+ maxLogicalTime = TabletTime.maxMetadataTime(maxLogicalTime, value.toString());
+ } else if (TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.hasColumns(key)) {
+ bw.addMutation(MetadataTableUtil.createDeleteMutation(range.getTableId().toString(), entry.getValue().toString()));
+ }
+ }
+
+ // read the logical time from the last tablet in the merge range, it is not included in
+ // the loop above
+ scanner = conn.createScanner(targetSystemTable, Authorizations.EMPTY);
+ scanner.setRange(new Range(stopRow));
+ TabletsSection.ServerColumnFamily.TIME_COLUMN.fetch(scanner);
+ for (Entry<Key,Value> entry : scanner) {
+ if (TabletsSection.ServerColumnFamily.TIME_COLUMN.hasColumns(entry.getKey())) {
+ maxLogicalTime = TabletTime.maxMetadataTime(maxLogicalTime, entry.getValue().toString());
+ }
+ }
+
+ if (maxLogicalTime != null)
+ TabletsSection.ServerColumnFamily.TIME_COLUMN.put(m, new Value(maxLogicalTime.getBytes()));
+
+ if (!m.getUpdates().isEmpty()) {
+ bw.addMutation(m);
+ }
+
+ bw.flush();
+
+ Master.log.debug("Moved " + fileCount + " files to " + stop);
+
+ if (firstPrevRowValue == null) {
+ Master.log.debug("tablet already merged");
+ return;
+ }
+
+ stop.setPrevEndRow(KeyExtent.decodePrevEndRow(firstPrevRowValue));
+ Mutation updatePrevRow = stop.getPrevRowUpdateMutation();
+ Master.log.debug("Setting the prevRow for last tablet: " + stop);
+ bw.addMutation(updatePrevRow);
+ bw.flush();
+
+ deleteTablets(info, scanRange, bw, conn);
+
+ // Clean-up the last chopped marker
+ m = new Mutation(stopRow);
+ ChoppedColumnFamily.CHOPPED_COLUMN.putDelete(m);
+ bw.addMutation(m);
+ bw.flush();
+
+ } catch (Exception ex) {
+ throw new AccumuloException(ex);
+ } finally {
+ if (bw != null)
+ try {
+ bw.close();
+ } catch (Exception ex) {
+ throw new AccumuloException(ex);
+ }
+ }
+ }
+
+ private void deleteTablets(MergeInfo info, Range scanRange, BatchWriter bw, Connector conn) throws TableNotFoundException, MutationsRejectedException {
+ Scanner scanner;
+ Mutation m;
+ // Delete everything in the other tablets
+ // group all deletes into tablet into one mutation, this makes tablets
+ // either disappear entirely or not all.. this is important for the case
+ // where the process terminates in the loop below...
+ scanner = conn.createScanner(info.getExtent().isMeta() ? RootTable.NAME : MetadataTable.NAME, Authorizations.EMPTY);
+ Master.log.debug("Deleting range " + scanRange);
+ scanner.setRange(scanRange);
+ RowIterator rowIter = new RowIterator(scanner);
+ while (rowIter.hasNext()) {
+ Iterator<Entry<Key,Value>> row = rowIter.next();
+ m = null;
+ while (row.hasNext()) {
+ Entry<Key,Value> entry = row.next();
+ Key key = entry.getKey();
+
+ if (m == null)
+ m = new Mutation(key.getRow());
+
+ m.putDelete(key.getColumnFamily(), key.getColumnQualifier());
+ Master.log.debug("deleting entry " + key);
+ }
+ bw.addMutation(m);
+ }
+
+ bw.flush();
+ }
+
+ private KeyExtent getHighTablet(KeyExtent range) throws AccumuloException {
+ try {
+ Connector conn = this.master.getConnector();
+ Scanner scanner = conn.createScanner(range.isMeta() ? RootTable.NAME : MetadataTable.NAME, Authorizations.EMPTY);
+ TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.fetch(scanner);
+ KeyExtent start = new KeyExtent(range.getTableId(), range.getEndRow(), null);
+ scanner.setRange(new Range(start.getMetadataEntry(), null));
+ Iterator<Entry<Key,Value>> iterator = scanner.iterator();
+ if (!iterator.hasNext()) {
+ throw new AccumuloException("No last tablet for a merge " + range);
+ }
+ Entry<Key,Value> entry = iterator.next();
+ KeyExtent highTablet = new KeyExtent(entry.getKey().getRow(), KeyExtent.decodePrevEndRow(entry.getValue()));
+ if (highTablet.getTableId() != range.getTableId()) {
+ throw new AccumuloException("No last tablet for merge " + range + " " + highTablet);
+ }
+ return highTablet;
+ } catch (Exception ex) {
+ throw new AccumuloException("Unexpected failure finding the last tablet for a merge " + range, ex);
+ }
+ }
+
+ private void flushChanges(SortedMap<TServerInstance,TabletServerStatus> currentTServers, List<Assignment> assignments, List<Assignment> assigned,
+ List<TabletLocationState> assignedToDeadServers, Map<KeyExtent,TServerInstance> unassigned) throws DistributedStoreException, TException {
+ if (!assignedToDeadServers.isEmpty()) {
+ int maxServersToShow = min(assignedToDeadServers.size(), 100);
+ Master.log.debug(assignedToDeadServers.size() + " assigned to dead servers: " + assignedToDeadServers.subList(0, maxServersToShow) + "...");
+ store.unassign(assignedToDeadServers);
+ this.master.nextEvent.event("Marked %d tablets as unassigned because they don't have current servers", assignedToDeadServers.size());
+ }
+
+ if (!currentTServers.isEmpty()) {
+ Map<KeyExtent,TServerInstance> assignedOut = new HashMap<KeyExtent,TServerInstance>();
+ this.master.tabletBalancer.getAssignments(Collections.unmodifiableSortedMap(currentTServers), Collections.unmodifiableMap(unassigned), assignedOut);
+ for (Entry<KeyExtent,TServerInstance> assignment : assignedOut.entrySet()) {
+ if (unassigned.containsKey(assignment.getKey())) {
+ if (assignment.getValue() != null) {
+ if (!currentTServers.containsKey(assignment.getValue())) {
+ Master.log.warn("balancer assigned " + assignment.getKey() + " to a tablet server that is not current " + assignment.getValue() + " ignoring");
+ continue;
+ }
+ Master.log.debug(store.name() + " assigning tablet " + assignment);
+ assignments.add(new Assignment(assignment.getKey(), assignment.getValue()));
+ }
+ } else {
+ Master.log.warn(store.name() + " load balancer assigning tablet that was not nominated for assignment " + assignment.getKey());
+ }
+ }
+ if (!unassigned.isEmpty() && assignedOut.isEmpty())
+ Master.log.warn("Load balancer failed to assign any tablets");
+ }
+
+ if (assignments.size() > 0) {
+ Master.log.info(String.format("Assigning %d tablets", assignments.size()));
+ store.setFutureLocations(assignments);
+ }
+ assignments.addAll(assigned);
+ for (Assignment a : assignments) {
+ TServerConnection conn = this.master.tserverSet.getConnection(a.server);
+ if (conn != null) {
+ conn.assignTablet(this.master.masterLock, a.tablet);
+ } else {
+ Master.log.warn("Could not connect to server " + a.server);
+ }
+ master.assignedTablet(a.tablet);
+ }
+ }
+
+}