You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ct...@apache.org on 2013/11/01 01:56:27 UTC
[48/54] [partial] ACCUMULO-658,
ACCUMULO-656 Split server into separate modules
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java b/server/base/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java
new file mode 100644
index 0000000..3eef065
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java
@@ -0,0 +1,399 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.master;
+
+import static org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy.SKIP;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.master.thrift.TabletServerStatus;
+import org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException;
+import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
+import org.apache.accumulo.core.util.AddressUtil;
+import org.apache.accumulo.core.util.ServerServices;
+import org.apache.accumulo.core.util.ThriftUtil;
+import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.server.master.state.TServerInstance;
+import org.apache.accumulo.server.security.SystemCredentials;
+import org.apache.accumulo.server.util.Halt;
+import org.apache.accumulo.server.util.time.SimpleTimer;
+import org.apache.accumulo.server.zookeeper.ZooCache;
+import org.apache.accumulo.server.zookeeper.ZooLock;
+import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
+import org.apache.accumulo.trace.instrument.Tracer;
+import org.apache.hadoop.io.Text;
+import org.apache.log4j.Logger;
+import org.apache.thrift.TException;
+import org.apache.thrift.transport.TTransport;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.apache.zookeeper.KeeperException.NotEmptyException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.data.Stat;
+
+import com.google.common.net.HostAndPort;
+
+public class LiveTServerSet implements Watcher {
+
+ public interface Listener {
+ void update(LiveTServerSet current, Set<TServerInstance> deleted, Set<TServerInstance> added);
+ }
+
+ private static final Logger log = Logger.getLogger(LiveTServerSet.class);
+
+ private final Listener cback;
+ private final Instance instance;
+ private final AccumuloConfiguration conf;
+ private ZooCache zooCache;
+
+ public class TServerConnection {
+ private final HostAndPort address;
+
+ public TServerConnection(HostAndPort addr) throws TException {
+ address = addr;
+ }
+
+ private String lockString(ZooLock mlock) {
+ return mlock.getLockID().serialize(ZooUtil.getRoot(instance) + Constants.ZMASTER_LOCK);
+ }
+
+ public void assignTablet(ZooLock lock, KeyExtent extent) throws TException {
+ TabletClientService.Client client = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, conf);
+ try {
+ client.loadTablet(Tracer.traceInfo(), SystemCredentials.get().toThrift(instance), lockString(lock), extent.toThrift());
+ } finally {
+ ThriftUtil.returnClient(client);
+ }
+ }
+
+ public void unloadTablet(ZooLock lock, KeyExtent extent, boolean save) throws TException {
+ TabletClientService.Client client = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, conf);
+ try {
+ client.unloadTablet(Tracer.traceInfo(), SystemCredentials.get().toThrift(instance), lockString(lock), extent.toThrift(), save);
+ } finally {
+ ThriftUtil.returnClient(client);
+ }
+ }
+
+ public TabletServerStatus getTableMap(boolean usePooledConnection) throws TException, ThriftSecurityException {
+
+ if (usePooledConnection == true)
+ throw new UnsupportedOperationException();
+
+ TTransport transport = ThriftUtil.createTransport(address, conf);
+
+ try {
+ TabletClientService.Client client = ThriftUtil.createClient(new TabletClientService.Client.Factory(), transport);
+ return client.getTabletServerStatus(Tracer.traceInfo(), SystemCredentials.get().toThrift(instance));
+ } finally {
+ if (transport != null)
+ transport.close();
+ }
+ }
+
+ public void halt(ZooLock lock) throws TException, ThriftSecurityException {
+ TabletClientService.Client client = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, conf);
+ try {
+ client.halt(Tracer.traceInfo(), SystemCredentials.get().toThrift(instance), lockString(lock));
+ } finally {
+ ThriftUtil.returnClient(client);
+ }
+ }
+
+ public void fastHalt(ZooLock lock) throws TException {
+ TabletClientService.Client client = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, conf);
+ try {
+ client.fastHalt(Tracer.traceInfo(), SystemCredentials.get().toThrift(instance), lockString(lock));
+ } finally {
+ ThriftUtil.returnClient(client);
+ }
+ }
+
+ public void flush(ZooLock lock, String tableId, byte[] startRow, byte[] endRow) throws TException {
+ TabletClientService.Client client = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, conf);
+ try {
+ client.flush(Tracer.traceInfo(), SystemCredentials.get().toThrift(instance), lockString(lock), tableId,
+ startRow == null ? null : ByteBuffer.wrap(startRow), endRow == null ? null : ByteBuffer.wrap(endRow));
+ } finally {
+ ThriftUtil.returnClient(client);
+ }
+ }
+
+ public void chop(ZooLock lock, KeyExtent extent) throws TException {
+ TabletClientService.Client client = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, conf);
+ try {
+ client.chop(Tracer.traceInfo(), SystemCredentials.get().toThrift(instance), lockString(lock), extent.toThrift());
+ } finally {
+ ThriftUtil.returnClient(client);
+ }
+ }
+
+ public void splitTablet(ZooLock lock, KeyExtent extent, Text splitPoint) throws TException, ThriftSecurityException, NotServingTabletException {
+ TabletClientService.Client client = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, conf);
+ try {
+ client.splitTablet(Tracer.traceInfo(), SystemCredentials.get().toThrift(instance), extent.toThrift(),
+ ByteBuffer.wrap(splitPoint.getBytes(), 0, splitPoint.getLength()));
+ } finally {
+ ThriftUtil.returnClient(client);
+ }
+ }
+
+ public void flushTablet(ZooLock lock, KeyExtent extent) throws TException {
+ TabletClientService.Client client = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, conf);
+ try {
+ client.flushTablet(Tracer.traceInfo(), SystemCredentials.get().toThrift(instance), lockString(lock), extent.toThrift());
+ } finally {
+ ThriftUtil.returnClient(client);
+ }
+ }
+
+ public void compact(ZooLock lock, String tableId, byte[] startRow, byte[] endRow) throws TException {
+ TabletClientService.Client client = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, conf);
+ try {
+ client.compact(Tracer.traceInfo(), SystemCredentials.get().toThrift(instance), lockString(lock), tableId,
+ startRow == null ? null : ByteBuffer.wrap(startRow), endRow == null ? null : ByteBuffer.wrap(endRow));
+ } finally {
+ ThriftUtil.returnClient(client);
+ }
+ }
+
+ public boolean isActive(long tid) throws TException {
+ TabletClientService.Client client = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, conf);
+ try {
+ return client.isActive(Tracer.traceInfo(), tid);
+ } finally {
+ ThriftUtil.returnClient(client);
+ }
+ }
+
+ }
+
+ static class TServerInfo {
+ TServerConnection connection;
+ TServerInstance instance;
+
+ TServerInfo(TServerInstance instance, TServerConnection connection) {
+ this.connection = connection;
+ this.instance = instance;
+ }
+ };
+
+ // The set of active tservers with locks, indexed by their name in zookeeper
+ private Map<String,TServerInfo> current = new HashMap<String,TServerInfo>();
+ // as above, indexed by TServerInstance
+ private Map<TServerInstance,TServerInfo> currentInstances = new HashMap<TServerInstance,TServerInfo>();
+
+ // The set of entries in zookeeper without locks, and the first time each was noticed
+ private Map<String,Long> locklessServers = new HashMap<String,Long>();
+
+ public LiveTServerSet(Instance instance, AccumuloConfiguration conf, Listener cback) {
+ this.cback = cback;
+ this.instance = instance;
+ this.conf = conf;
+
+ }
+
+ public synchronized ZooCache getZooCache() {
+ if (zooCache == null)
+ zooCache = new ZooCache(this);
+ return zooCache;
+ }
+
+ public synchronized void startListeningForTabletServerChanges() {
+ scanServers();
+ SimpleTimer.getInstance().schedule(new Runnable() {
+ @Override
+ public void run() {
+ scanServers();
+ }
+ }, 0, 5000);
+ }
+
+ public synchronized void scanServers() {
+ try {
+ final Set<TServerInstance> updates = new HashSet<TServerInstance>();
+ final Set<TServerInstance> doomed = new HashSet<TServerInstance>();
+
+ final String path = ZooUtil.getRoot(instance) + Constants.ZTSERVERS;
+
+ HashSet<String> all = new HashSet<String>(current.keySet());
+ all.addAll(getZooCache().getChildren(path));
+
+ locklessServers.keySet().retainAll(all);
+
+ for (String zPath : all) {
+ checkServer(updates, doomed, path, zPath);
+ }
+
+ // log.debug("Current: " + current.keySet());
+ if (!doomed.isEmpty() || !updates.isEmpty())
+ this.cback.update(this, doomed, updates);
+ } catch (Exception ex) {
+ log.error(ex, ex);
+ }
+ }
+
+ private void deleteServerNode(String serverNode) throws InterruptedException, KeeperException {
+ try {
+ ZooReaderWriter.getInstance().delete(serverNode, -1);
+ } catch (NotEmptyException ex) {
+ // race condition: tserver created the lock after our last check; we'll see it at the next check
+ } catch (NoNodeException nne) {
+ // someone else deleted it
+ }
+ }
+
+ private synchronized void checkServer(final Set<TServerInstance> updates, final Set<TServerInstance> doomed, final String path, final String zPath)
+ throws TException, InterruptedException, KeeperException {
+
+ TServerInfo info = current.get(zPath);
+
+ final String lockPath = path + "/" + zPath;
+ Stat stat = new Stat();
+ byte[] lockData = ZooLock.getLockData(getZooCache(), lockPath, stat);
+
+ if (lockData == null) {
+ if (info != null) {
+ doomed.add(info.instance);
+ current.remove(zPath);
+ currentInstances.remove(info.instance);
+ }
+
+ Long firstSeen = locklessServers.get(zPath);
+ if (firstSeen == null) {
+ locklessServers.put(zPath, System.currentTimeMillis());
+ } else if (System.currentTimeMillis() - firstSeen > 10 * 60 * 1000) {
+ deleteServerNode(path + "/" + zPath);
+ locklessServers.remove(zPath);
+ }
+ } else {
+ locklessServers.remove(zPath);
+ ServerServices services = new ServerServices(new String(lockData));
+ HostAndPort client = services.getAddress(ServerServices.Service.TSERV_CLIENT);
+ TServerInstance instance = new TServerInstance(client, stat.getEphemeralOwner());
+
+ if (info == null) {
+ updates.add(instance);
+ TServerInfo tServerInfo = new TServerInfo(instance, new TServerConnection(client));
+ current.put(zPath, tServerInfo);
+ currentInstances.put(instance, tServerInfo);
+ } else if (!info.instance.equals(instance)) {
+ doomed.add(info.instance);
+ updates.add(instance);
+ TServerInfo tServerInfo = new TServerInfo(instance, new TServerConnection(client));
+ current.put(zPath, tServerInfo);
+ currentInstances.put(info.instance, tServerInfo);
+ }
+ }
+ }
+
+ @Override
+ public void process(WatchedEvent event) {
+
+ // its important that these event are propagated by ZooCache, because this ensures when reading zoocache that is has already processed the event and cleared
+ // relevant nodes before code below reads from zoocache
+
+ if (event.getPath() != null) {
+ if (event.getPath().endsWith(Constants.ZTSERVERS)) {
+ scanServers();
+ } else if (event.getPath().contains(Constants.ZTSERVERS)) {
+ int pos = event.getPath().lastIndexOf('/');
+
+ // do only if ZTSERVER is parent
+ if (pos >= 0 && event.getPath().substring(0, pos).endsWith(Constants.ZTSERVERS)) {
+
+ String server = event.getPath().substring(pos + 1);
+
+ final Set<TServerInstance> updates = new HashSet<TServerInstance>();
+ final Set<TServerInstance> doomed = new HashSet<TServerInstance>();
+
+ final String path = ZooUtil.getRoot(instance) + Constants.ZTSERVERS;
+
+ try {
+ checkServer(updates, doomed, path, server);
+ if (!doomed.isEmpty() || !updates.isEmpty())
+ this.cback.update(this, doomed, updates);
+ } catch (Exception ex) {
+ log.error(ex, ex);
+ }
+ }
+ }
+ }
+ }
+
+ public synchronized TServerConnection getConnection(TServerInstance server) throws TException {
+ if (server == null)
+ return null;
+ TServerInfo tServerInfo = currentInstances.get(server);
+ if (tServerInfo == null)
+ return null;
+ return tServerInfo.connection;
+ }
+
+ public synchronized Set<TServerInstance> getCurrentServers() {
+ return new HashSet<TServerInstance>(currentInstances.keySet());
+ }
+
+ public synchronized int size() {
+ return current.size();
+ }
+
+ public synchronized TServerInstance find(String tabletServer) {
+ HostAndPort addr = AddressUtil.parseAddress(tabletServer);
+ for (Entry<String,TServerInfo> entry : current.entrySet()) {
+ if (entry.getValue().instance.getLocation().equals(addr))
+ return entry.getValue().instance;
+ }
+ return null;
+ }
+
+ public synchronized void remove(TServerInstance server) {
+ String zPath = null;
+ for (Entry<String,TServerInfo> entry : current.entrySet()) {
+ if (entry.getValue().instance.equals(server)) {
+ zPath = entry.getKey();
+ break;
+ }
+ }
+ if (zPath == null)
+ return;
+ current.remove(zPath);
+ currentInstances.remove(server);
+
+ log.info("Removing zookeeper lock for " + server);
+ String fullpath = ZooUtil.getRoot(instance) + Constants.ZTSERVERS + "/" + zPath;
+ try {
+ ZooReaderWriter.getRetryingInstance().recursiveDelete(fullpath, SKIP);
+ } catch (Exception e) {
+ String msg = "error removing tablet server lock";
+ log.fatal(msg, e);
+ Halt.halt(msg, -1);
+ }
+ getZooCache().clear(fullpath);
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/master/balancer/ChaoticLoadBalancer.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/balancer/ChaoticLoadBalancer.java b/server/base/src/main/java/org/apache/accumulo/server/master/balancer/ChaoticLoadBalancer.java
new file mode 100644
index 0000000..ec3371c
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/balancer/ChaoticLoadBalancer.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.master.balancer;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Random;
+import java.util.Set;
+import java.util.SortedMap;
+
+import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.master.thrift.TableInfo;
+import org.apache.accumulo.core.master.thrift.TabletServerStatus;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.tabletserver.thrift.TabletStats;
+import org.apache.accumulo.server.conf.ServerConfiguration;
+import org.apache.accumulo.server.master.state.TServerInstance;
+import org.apache.accumulo.server.master.state.TabletMigration;
+import org.apache.thrift.TException;
+
+/**
+ * A chaotic load balancer used for testing. It constantly shuffles tablets, preventing them from resting in a single location for very long. This is not
+ * designed for performance, do not use on production systems. I'm calling it the LokiLoadBalancer.
+ */
+public class ChaoticLoadBalancer extends TabletBalancer {
+ Random r = new Random();
+
+ @Override
+ public void getAssignments(SortedMap<TServerInstance,TabletServerStatus> current, Map<KeyExtent,TServerInstance> unassigned,
+ Map<KeyExtent,TServerInstance> assignments) {
+ long total = assignments.size() + unassigned.size();
+ long avg = (long) Math.ceil(((double) total) / current.size());
+ Map<TServerInstance,Long> toAssign = new HashMap<TServerInstance,Long>();
+ List<TServerInstance> tServerArray = new ArrayList<TServerInstance>();
+ for (Entry<TServerInstance,TabletServerStatus> e : current.entrySet()) {
+ long numTablets = 0;
+ for (TableInfo ti : e.getValue().getTableMap().values()) {
+ numTablets += ti.tablets;
+ }
+ if (numTablets < avg) {
+ tServerArray.add(e.getKey());
+ toAssign.put(e.getKey(), avg - numTablets);
+ }
+ }
+
+ for (KeyExtent ke : unassigned.keySet()) {
+ int index = r.nextInt(tServerArray.size());
+ TServerInstance dest = tServerArray.get(index);
+ assignments.put(ke, dest);
+ long remaining = toAssign.get(dest).longValue() - 1;
+ if (remaining == 0) {
+ tServerArray.remove(index);
+ toAssign.remove(dest);
+ } else {
+ toAssign.put(dest, remaining);
+ }
+ }
+ }
+
+ /**
+ * Will balance randomly, maintaining distribution
+ */
+ @Override
+ public long balance(SortedMap<TServerInstance,TabletServerStatus> current, Set<KeyExtent> migrations, List<TabletMigration> migrationsOut) {
+ Map<TServerInstance,Long> numTablets = new HashMap<TServerInstance,Long>();
+ List<TServerInstance> underCapacityTServer = new ArrayList<TServerInstance>();
+
+ if (!migrations.isEmpty())
+ return 100;
+
+ boolean moveMetadata = r.nextInt(4) == 0;
+ long totalTablets = 0;
+ for (Entry<TServerInstance,TabletServerStatus> e : current.entrySet()) {
+ long tabletCount = 0;
+ for (TableInfo ti : e.getValue().getTableMap().values()) {
+ tabletCount += ti.tablets;
+ }
+ numTablets.put(e.getKey(), tabletCount);
+ underCapacityTServer.add(e.getKey());
+ totalTablets += tabletCount;
+ }
+ // totalTablets is fuzzy due to asynchronicity of the stats
+ // *1.2 to handle fuzziness, and prevent locking for 'perfect' balancing scenarios
+ long avg = (long) Math.ceil(((double) totalTablets) / current.size() * 1.2);
+
+ for (Entry<TServerInstance,TabletServerStatus> e : current.entrySet()) {
+ for (String table : e.getValue().getTableMap().keySet()) {
+ if (!moveMetadata && MetadataTable.NAME.equals(table))
+ continue;
+ try {
+ for (TabletStats ts : getOnlineTabletsForTable(e.getKey(), table)) {
+ KeyExtent ke = new KeyExtent(ts.extent);
+ int index = r.nextInt(underCapacityTServer.size());
+ TServerInstance dest = underCapacityTServer.get(index);
+ if (dest.equals(e.getKey()))
+ continue;
+ migrationsOut.add(new TabletMigration(ke, e.getKey(), dest));
+ if (numTablets.put(dest, numTablets.get(dest) + 1) > avg)
+ underCapacityTServer.remove(index);
+ if (numTablets.put(e.getKey(), numTablets.get(e.getKey()) - 1) <= avg && !underCapacityTServer.contains(e.getKey()))
+ underCapacityTServer.add(e.getKey());
+
+ // We can get some craziness with only 1 tserver, so lets make sure there's always an option!
+ if (underCapacityTServer.isEmpty())
+ underCapacityTServer.addAll(numTablets.keySet());
+ }
+ } catch (ThriftSecurityException e1) {
+ // Shouldn't happen, but carry on if it does
+ e1.printStackTrace();
+ } catch (TException e1) {
+ // Shouldn't happen, but carry on if it does
+ e1.printStackTrace();
+ }
+ }
+ }
+
+ return 100;
+ }
+
+ @Override
+ public void init(ServerConfiguration conf) {
+ super.init(conf);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/master/balancer/DefaultLoadBalancer.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/balancer/DefaultLoadBalancer.java b/server/base/src/main/java/org/apache/accumulo/server/master/balancer/DefaultLoadBalancer.java
new file mode 100644
index 0000000..9b88d74
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/balancer/DefaultLoadBalancer.java
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.master.balancer;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.SortedMap;
+
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.master.thrift.TableInfo;
+import org.apache.accumulo.core.master.thrift.TabletServerStatus;
+import org.apache.accumulo.core.tabletserver.thrift.TabletStats;
+import org.apache.accumulo.server.master.state.TServerInstance;
+import org.apache.accumulo.server.master.state.TabletMigration;
+import org.apache.log4j.Logger;
+
+public class DefaultLoadBalancer extends TabletBalancer {
+
+ private static final Logger log = Logger.getLogger(DefaultLoadBalancer.class);
+
+ Iterator<TServerInstance> assignments;
+ // if tableToBalance is set, then only balance the given table
+ String tableToBalance = null;
+
+ public DefaultLoadBalancer() {
+
+ }
+
+ public DefaultLoadBalancer(String table) {
+ tableToBalance = table;
+ }
+
+ List<TServerInstance> randomize(Set<TServerInstance> locations) {
+ List<TServerInstance> result = new ArrayList<TServerInstance>(locations);
+ Collections.shuffle(result);
+ return result;
+ }
+
+ public TServerInstance getAssignment(SortedMap<TServerInstance,TabletServerStatus> locations, KeyExtent extent, TServerInstance last) {
+ if (locations.size() == 0)
+ return null;
+
+ if (last != null) {
+ // Maintain locality
+ TServerInstance simple = new TServerInstance(last.getLocation(), "");
+ Iterator<TServerInstance> find = locations.tailMap(simple).keySet().iterator();
+ if (find.hasNext()) {
+ TServerInstance current = find.next();
+ if (current.host().equals(last.host()))
+ return current;
+ }
+ }
+
+ // The strategy here is to walk through the locations and hand them back, one at a time
+ // Grab an iterator off of the set of options; use a new iterator if it hands back something not in the current list.
+ if (assignments == null || !assignments.hasNext())
+ assignments = randomize(locations.keySet()).iterator();
+ TServerInstance result = assignments.next();
+ if (!locations.containsKey(result)) {
+ assignments = null;
+ return randomize(locations.keySet()).iterator().next();
+ }
+ return result;
+ }
+
+ static class ServerCounts implements Comparable<ServerCounts> {
+ public final TServerInstance server;
+ public final int count;
+ public final TabletServerStatus status;
+
+ ServerCounts(int count, TServerInstance server, TabletServerStatus status) {
+ this.count = count;
+ this.server = server;
+ this.status = status;
+ }
+
+ public int compareTo(ServerCounts obj) {
+ int result = count - obj.count;
+ if (result == 0)
+ return server.compareTo(obj.server);
+ return result;
+ }
+ }
+
+ public boolean getMigrations(Map<TServerInstance,TabletServerStatus> current, List<TabletMigration> result) {
+ boolean moreBalancingNeeded = false;
+ try {
+ // no moves possible
+ if (current.size() < 2) {
+ return false;
+ }
+
+ // Sort by total number of online tablets, per server
+ int total = 0;
+ ArrayList<ServerCounts> totals = new ArrayList<ServerCounts>();
+ for (Entry<TServerInstance,TabletServerStatus> entry : current.entrySet()) {
+ int serverTotal = 0;
+ if (entry.getValue() != null && entry.getValue().tableMap != null) {
+ for (Entry<String,TableInfo> e : entry.getValue().tableMap.entrySet()) {
+ /**
+ * The check below was on entry.getKey(), but that resolves to a tabletserver not a tablename. Believe it should be e.getKey() which is a tablename
+ */
+ if (tableToBalance == null || tableToBalance.equals(e.getKey()))
+ serverTotal += e.getValue().onlineTablets;
+ }
+ }
+ totals.add(new ServerCounts(serverTotal, entry.getKey(), entry.getValue()));
+ total += serverTotal;
+ }
+
+ // order from low to high
+ Collections.sort(totals);
+ Collections.reverse(totals);
+ int even = total / totals.size();
+ int numServersOverEven = total % totals.size();
+
+ // Move tablets from the servers with too many to the servers with
+ // the fewest but only nominate tablets to move once. This allows us
+ // to fill new servers with tablets from a mostly balanced server
+ // very quickly. However, it may take several balancing passes to move
+ // tablets from one hugely overloaded server to many slightly
+ // under-loaded servers.
+ int end = totals.size() - 1;
+ int movedAlready = 0;
+ for (int tooManyIndex = 0; tooManyIndex < totals.size(); tooManyIndex++) {
+ ServerCounts tooMany = totals.get(tooManyIndex);
+ int goal = even;
+ if (tooManyIndex < numServersOverEven) {
+ goal++;
+ }
+ int needToUnload = tooMany.count - goal;
+ ServerCounts tooLittle = totals.get(end);
+ int needToLoad = goal - tooLittle.count - movedAlready;
+ if (needToUnload < 1 && needToLoad < 1) {
+ break;
+ }
+ if (needToUnload >= needToLoad) {
+ result.addAll(move(tooMany, tooLittle, needToLoad));
+ end--;
+ movedAlready = 0;
+ } else {
+ result.addAll(move(tooMany, tooLittle, needToUnload));
+ movedAlready += needToUnload;
+ }
+ if (needToUnload > needToLoad)
+ moreBalancingNeeded = true;
+ }
+
+ } finally {
+ log.debug("balance ended with " + result.size() + " migrations");
+ }
+ return moreBalancingNeeded;
+ }
+
+ static class TableDiff {
+ int diff;
+ String table;
+
+ public TableDiff(int diff, String table) {
+ this.diff = diff;
+ this.table = table;
+ }
+ };
+
+ /**
+ * Select a tablet based on differences between table loads; if the loads are even, use the busiest table
+ */
+ List<TabletMigration> move(ServerCounts tooMuch, ServerCounts tooLittle, int count) {
+
+ List<TabletMigration> result = new ArrayList<TabletMigration>();
+ if (count == 0)
+ return result;
+
+ Map<String,Map<KeyExtent,TabletStats>> onlineTablets = new HashMap<String,Map<KeyExtent,TabletStats>>();
+ // Copy counts so we can update them as we propose migrations
+ Map<String,Integer> tooMuchMap = tabletCountsPerTable(tooMuch.status);
+ Map<String,Integer> tooLittleMap = tabletCountsPerTable(tooLittle.status);
+
+ for (int i = 0; i < count; i++) {
+ String table;
+ Integer tooLittleCount;
+ if (tableToBalance == null) {
+ // find a table to migrate
+ // look for an uneven table count
+ int biggestDifference = 0;
+ String biggestDifferenceTable = null;
+ for (Entry<String,Integer> tableEntry : tooMuchMap.entrySet()) {
+ String tableID = tableEntry.getKey();
+ if (tooLittleMap.get(tableID) == null)
+ tooLittleMap.put(tableID, 0);
+ int diff = tableEntry.getValue() - tooLittleMap.get(tableID);
+ if (diff > biggestDifference) {
+ biggestDifference = diff;
+ biggestDifferenceTable = tableID;
+ }
+ }
+ if (biggestDifference < 2) {
+ table = busiest(tooMuch.status.tableMap);
+ } else {
+ table = biggestDifferenceTable;
+ }
+ } else {
+ // just balance the given table
+ table = tableToBalance;
+ }
+ Map<KeyExtent,TabletStats> onlineTabletsForTable = onlineTablets.get(table);
+ try {
+ if (onlineTabletsForTable == null) {
+ onlineTabletsForTable = new HashMap<KeyExtent,TabletStats>();
+ for (TabletStats stat : getOnlineTabletsForTable(tooMuch.server, table))
+ onlineTabletsForTable.put(new KeyExtent(stat.extent), stat);
+ onlineTablets.put(table, onlineTabletsForTable);
+ }
+ } catch (Exception ex) {
+ log.error("Unable to select a tablet to move", ex);
+ return result;
+ }
+ KeyExtent extent = selectTablet(tooMuch.server, onlineTabletsForTable);
+ onlineTabletsForTable.remove(extent);
+ if (extent == null)
+ return result;
+ tooMuchMap.put(table, tooMuchMap.get(table) - 1);
+ /**
+ * If a table grows from 1 tablet then tooLittleMap.get(table) can return a null, since there is only one tabletserver that holds all of the tablets. Here
+ * we check to see if in fact that is the case and if so set the value to 0.
+ */
+ tooLittleCount = tooLittleMap.get(table);
+ if (tooLittleCount == null) {
+ tooLittleCount = 0;
+ }
+ tooLittleMap.put(table, tooLittleCount + 1);
+
+ result.add(new TabletMigration(extent, tooMuch.server, tooLittle.server));
+ }
+ return result;
+ }
+
+ static Map<String,Integer> tabletCountsPerTable(TabletServerStatus status) {
+ Map<String,Integer> result = new HashMap<String,Integer>();
+ if (status != null && status.tableMap != null) {
+ Map<String,TableInfo> tableMap = status.tableMap;
+ for (Entry<String,TableInfo> entry : tableMap.entrySet()) {
+ result.put(entry.getKey(), entry.getValue().onlineTablets);
+ }
+ }
+ return result;
+ }
+
+ static KeyExtent selectTablet(TServerInstance tserver, Map<KeyExtent,TabletStats> extents) {
+ if (extents.size() == 0)
+ return null;
+ KeyExtent mostRecentlySplit = null;
+ long splitTime = 0;
+ for (Entry<KeyExtent,TabletStats> entry : extents.entrySet())
+ if (entry.getValue().splitCreationTime >= splitTime) {
+ splitTime = entry.getValue().splitCreationTime;
+ mostRecentlySplit = entry.getKey();
+ }
+ return mostRecentlySplit;
+ }
+
+ // define what it means for a tablet to be busy
+ private static String busiest(Map<String,TableInfo> tables) {
+ String result = null;
+ double busiest = Double.NEGATIVE_INFINITY;
+ for (Entry<String,TableInfo> entry : tables.entrySet()) {
+ TableInfo info = entry.getValue();
+ double busy = info.ingestRate + info.queryRate;
+ if (busy > busiest) {
+ busiest = busy;
+ result = entry.getKey();
+ }
+ }
+ return result;
+ }
+
+ @Override
+ public void getAssignments(SortedMap<TServerInstance,TabletServerStatus> current, Map<KeyExtent,TServerInstance> unassigned,
+ Map<KeyExtent,TServerInstance> assignments) {
+ for (Entry<KeyExtent,TServerInstance> entry : unassigned.entrySet()) {
+ assignments.put(entry.getKey(), getAssignment(current, entry.getKey(), entry.getValue()));
+ }
+ }
+
+ @Override
+ public long balance(SortedMap<TServerInstance,TabletServerStatus> current, Set<KeyExtent> migrations, List<TabletMigration> migrationsOut) {
+ // do we have any servers?
+ if (current.size() > 0) {
+ // Don't migrate if we have migrations in progress
+ if (migrations.size() == 0) {
+ if (getMigrations(current, migrationsOut))
+ return 1 * 1000;
+ }
+ }
+ return 5 * 1000;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/master/balancer/TableLoadBalancer.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/balancer/TableLoadBalancer.java b/server/base/src/main/java/org/apache/accumulo/server/master/balancer/TableLoadBalancer.java
new file mode 100644
index 0000000..3e0a2bf
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/balancer/TableLoadBalancer.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.master.balancer;
+
+import java.lang.reflect.Constructor;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.SortedMap;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.admin.TableOperations;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.master.thrift.TabletServerStatus;
+import org.apache.accumulo.server.master.state.TServerInstance;
+import org.apache.accumulo.server.master.state.TabletMigration;
+import org.apache.accumulo.server.security.SystemCredentials;
+import org.apache.accumulo.start.classloader.vfs.AccumuloVFSClassLoader;
+import org.apache.log4j.Logger;
+
+public class TableLoadBalancer extends TabletBalancer {
+
+ private static final Logger log = Logger.getLogger(TableLoadBalancer.class);
+
+ Map<String,TabletBalancer> perTableBalancers = new HashMap<String,TabletBalancer>();
+
+ private TabletBalancer constructNewBalancerForTable(String clazzName, String table) throws Exception {
+ Class<? extends TabletBalancer> clazz = AccumuloVFSClassLoader.loadClass(clazzName, TabletBalancer.class);
+ Constructor<? extends TabletBalancer> constructor = clazz.getConstructor(String.class);
+ return constructor.newInstance(table);
+ }
+
+ protected String getLoadBalancerClassNameForTable(String table) {
+ return configuration.getTableConfiguration(table).get(Property.TABLE_LOAD_BALANCER);
+ }
+
+ protected TabletBalancer getBalancerForTable(String table) {
+ TabletBalancer balancer = perTableBalancers.get(table);
+
+ String clazzName = getLoadBalancerClassNameForTable(table);
+
+ if (clazzName == null)
+ clazzName = DefaultLoadBalancer.class.getName();
+ if (balancer != null) {
+ if (clazzName.equals(balancer.getClass().getName()) == false) {
+ // the balancer class for this table does not match the class specified in the configuration
+ try {
+ // attempt to construct a balancer with the specified class
+ TabletBalancer newBalancer = constructNewBalancerForTable(clazzName, table);
+ if (newBalancer != null) {
+ balancer = newBalancer;
+ perTableBalancers.put(table, balancer);
+ balancer.init(configuration);
+ }
+ } catch (Exception e) {
+ log.warn("Failed to load table balancer class " + clazzName + " for table " + table, e);
+ }
+ }
+ }
+ if (balancer == null) {
+ try {
+ balancer = constructNewBalancerForTable(clazzName, table);
+ log.info("Loaded class " + clazzName + " for table " + table);
+ } catch (Exception e) {
+ log.warn("Failed to load table balancer class " + clazzName + " for table " + table, e);
+ }
+
+ if (balancer == null) {
+ log.info("Using balancer " + DefaultLoadBalancer.class.getName() + " for table " + table);
+ balancer = new DefaultLoadBalancer(table);
+ }
+ perTableBalancers.put(table, balancer);
+ balancer.init(configuration);
+ }
+ return balancer;
+ }
+
+ @Override
+ public void getAssignments(SortedMap<TServerInstance,TabletServerStatus> current, Map<KeyExtent,TServerInstance> unassigned,
+ Map<KeyExtent,TServerInstance> assignments) {
+ // separate the unassigned into tables
+ Map<String,Map<KeyExtent,TServerInstance>> groupedUnassigned = new HashMap<String,Map<KeyExtent,TServerInstance>>();
+ for (Entry<KeyExtent,TServerInstance> e : unassigned.entrySet()) {
+ Map<KeyExtent,TServerInstance> tableUnassigned = groupedUnassigned.get(e.getKey().getTableId().toString());
+ if (tableUnassigned == null) {
+ tableUnassigned = new HashMap<KeyExtent,TServerInstance>();
+ groupedUnassigned.put(e.getKey().getTableId().toString(), tableUnassigned);
+ }
+ tableUnassigned.put(e.getKey(), e.getValue());
+ }
+ for (Entry<String,Map<KeyExtent,TServerInstance>> e : groupedUnassigned.entrySet()) {
+ Map<KeyExtent,TServerInstance> newAssignments = new HashMap<KeyExtent,TServerInstance>();
+ getBalancerForTable(e.getKey()).getAssignments(current, e.getValue(), newAssignments);
+ assignments.putAll(newAssignments);
+ }
+ }
+
+ private TableOperations tops = null;
+
+ protected TableOperations getTableOperations() {
+ if (tops == null)
+ try {
+ tops = configuration.getInstance().getConnector(SystemCredentials.get().getPrincipal(), SystemCredentials.get().getToken()).tableOperations();
+ } catch (AccumuloException e) {
+ log.error("Unable to access table operations from within table balancer", e);
+ } catch (AccumuloSecurityException e) {
+ log.error("Unable to access table operations from within table balancer", e);
+ }
+ return tops;
+ }
+
+ @Override
+ public long balance(SortedMap<TServerInstance,TabletServerStatus> current, Set<KeyExtent> migrations, List<TabletMigration> migrationsOut) {
+ long minBalanceTime = 5 * 1000;
+ // Iterate over the tables and balance each of them
+ TableOperations t = getTableOperations();
+ if (t == null)
+ return minBalanceTime;
+ for (String s : t.tableIdMap().values()) {
+ ArrayList<TabletMigration> newMigrations = new ArrayList<TabletMigration>();
+ long tableBalanceTime = getBalancerForTable(s).balance(current, migrations, newMigrations);
+ if (tableBalanceTime < minBalanceTime)
+ minBalanceTime = tableBalanceTime;
+ migrationsOut.addAll(newMigrations);
+ }
+ return minBalanceTime;
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/master/balancer/TabletBalancer.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/balancer/TabletBalancer.java b/server/base/src/main/java/org/apache/accumulo/server/master/balancer/TabletBalancer.java
new file mode 100644
index 0000000..fd76ce2
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/balancer/TabletBalancer.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.master.balancer;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedMap;
+
+import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.master.thrift.TabletServerStatus;
+import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
+import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Client;
+import org.apache.accumulo.core.tabletserver.thrift.TabletStats;
+import org.apache.accumulo.core.util.ThriftUtil;
+import org.apache.accumulo.server.conf.ServerConfiguration;
+import org.apache.accumulo.server.master.state.TServerInstance;
+import org.apache.accumulo.server.master.state.TabletMigration;
+import org.apache.accumulo.server.security.SystemCredentials;
+import org.apache.accumulo.trace.instrument.Tracer;
+import org.apache.log4j.Logger;
+import org.apache.thrift.TException;
+import org.apache.thrift.transport.TTransportException;
+
+public abstract class TabletBalancer {
+
+ private static final Logger log = Logger.getLogger(TabletBalancer.class);
+
+ protected ServerConfiguration configuration;
+
+ /**
+ * Initialize the TabletBalancer. This gives the balancer the opportunity to read the configuration.
+ */
+ public void init(ServerConfiguration conf) {
+ configuration = conf;
+ }
+
+ /**
+ * Assign tablets to tablet servers. This method is called whenever the master finds tablets that are unassigned.
+ *
+ * @param current
+ * The current table-summary state of all the online tablet servers. Read-only. The TabletServerStatus for each server may be null if the tablet
+ * server has not yet responded to a recent request for status.
+ * @param unassigned
+ * A map from unassigned tablet to the last known tablet server. Read-only.
+ * @param assignments
+ * A map from tablet to assigned server. Write-only.
+ */
+ abstract public void getAssignments(SortedMap<TServerInstance,TabletServerStatus> current, Map<KeyExtent,TServerInstance> unassigned,
+ Map<KeyExtent,TServerInstance> assignments);
+
+ /**
+ * Ask the balancer if any migrations are necessary.
+ *
+ * @param current
+ * The current table-summary state of all the online tablet servers. Read-only.
+ * @param migrations
+ * the current set of migrations. Read-only.
+ * @param migrationsOut
+ * new migrations to perform; should not contain tablets in the current set of migrations. Write-only.
+ * @return the time, in milliseconds, to wait before re-balancing.
+ *
+ * This method will not be called when there are unassigned tablets.
+ */
+ public abstract long balance(SortedMap<TServerInstance,TabletServerStatus> current, Set<KeyExtent> migrations, List<TabletMigration> migrationsOut);
+
+ /**
+ * Fetch the tablets for the given table by asking the tablet server. Useful if your balance strategy needs details at the tablet level to decide what tablets
+ * to move.
+ *
+ * @param tserver
+ * The tablet server to ask.
+ * @param tableId
+ * The table id
+ * @return a list of tablet statistics
+ * @throws ThriftSecurityException
+ * tablet server disapproves of your internal System password.
+ * @throws TException
+ * any other problem
+ */
+ public List<TabletStats> getOnlineTabletsForTable(TServerInstance tserver, String tableId) throws ThriftSecurityException, TException {
+ log.debug("Scanning tablet server " + tserver + " for table " + tableId);
+ Client client = ThriftUtil.getClient(new TabletClientService.Client.Factory(), tserver.getLocation(), configuration.getConfiguration());
+ try {
+ List<TabletStats> onlineTabletsForTable = client.getTabletStats(Tracer.traceInfo(), SystemCredentials.get().toThrift(configuration.getInstance()),
+ tableId);
+ return onlineTabletsForTable;
+ } catch (TTransportException e) {
+ log.error("Unable to connect to " + tserver + ": " + e);
+ } finally {
+ ThriftUtil.returnClient(client);
+ }
+ return null;
+ }
+
+ /**
+ * Utility to ensure that the migrations from balance() are consistent:
+ * <ul>
+ * <li>Tablet objects are not null
+ * <li>Source and destination tablet servers are not null and current
+ * </ul>
+ *
+ * @param current
+ * @param migrations
+ * @return A list of TabletMigration object that passed sanity checks.
+ */
+ public static List<TabletMigration> checkMigrationSanity(Set<TServerInstance> current, List<TabletMigration> migrations) {
+ List<TabletMigration> result = new ArrayList<TabletMigration>(migrations.size());
+ for (TabletMigration m : migrations) {
+ if (m.tablet == null) {
+ log.warn("Balancer gave back a null tablet " + m);
+ continue;
+ }
+ if (m.newServer == null) {
+ log.warn("Balancer did not set the destination " + m);
+ continue;
+ }
+ if (m.oldServer == null) {
+ log.warn("Balancer did not set the source " + m);
+ continue;
+ }
+ if (!current.contains(m.oldServer)) {
+ log.warn("Balancer wants to move a tablet from a server that is not current: " + m);
+ continue;
+ }
+ if (!current.contains(m.newServer)) {
+ log.warn("Balancer wants to move a tablet to a server that is not current: " + m);
+ continue;
+ }
+ result.add(m);
+ }
+ return result;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/master/recovery/HadoopLogCloser.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/recovery/HadoopLogCloser.java b/server/base/src/main/java/org/apache/accumulo/server/master/recovery/HadoopLogCloser.java
new file mode 100644
index 0000000..b3c0934
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/recovery/HadoopLogCloser.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.master.recovery;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.log4j.Logger;
+
+public class HadoopLogCloser implements LogCloser {
+
+ private static Logger log = Logger.getLogger(HadoopLogCloser.class);
+
+ @Override
+ public long close(AccumuloConfiguration conf, VolumeManager fs, Path source) throws IOException {
+ FileSystem ns = fs.getFileSystemByPath(source);
+ if (ns instanceof DistributedFileSystem) {
+ DistributedFileSystem dfs = (DistributedFileSystem) ns;
+ try {
+ if (!dfs.recoverLease(source)) {
+ log.info("Waiting for file to be closed " + source.toString());
+ return conf.getTimeInMillis(Property.MASTER_LEASE_RECOVERY_WAITING_PERIOD);
+ }
+ log.info("Recovered lease on " + source.toString());
+ } catch (FileNotFoundException ex) {
+ throw ex;
+ } catch (Exception ex) {
+ log.warn("Error recovering lease on " + source.toString(), ex);
+ ns.append(source).close();
+ log.info("Recovered lease on " + source.toString() + " using append");
+ }
+ } else if (ns instanceof LocalFileSystem) {
+ // ignore
+ } else {
+ throw new IllegalStateException("Don't know how to recover a lease for " + fs.getClass().getName());
+ }
+ return 0;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/master/recovery/LogCloser.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/recovery/LogCloser.java b/server/base/src/main/java/org/apache/accumulo/server/master/recovery/LogCloser.java
new file mode 100644
index 0000000..deeea61
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/recovery/LogCloser.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.master.recovery;
+
+import java.io.IOException;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.hadoop.fs.Path;
+
+public interface LogCloser {
+ public long close(AccumuloConfiguration conf, VolumeManager fs, Path path) throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/master/recovery/MapRLogCloser.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/recovery/MapRLogCloser.java b/server/base/src/main/java/org/apache/accumulo/server/master/recovery/MapRLogCloser.java
new file mode 100644
index 0000000..bba7ac5
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/recovery/MapRLogCloser.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.master.recovery;
+
+import java.io.IOException;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.log4j.Logger;
+
+public class MapRLogCloser implements LogCloser {
+
+ private static Logger log = Logger.getLogger(MapRLogCloser.class);
+
+ @Override
+ public long close(AccumuloConfiguration conf, VolumeManager fs, Path path) throws IOException {
+ log.info("Recovering file " + path.toString() + " by changing permission to readonly");
+ FileSystem ns = fs.getFileSystemByPath(path);
+ FsPermission roPerm = new FsPermission((short) 0444);
+ try {
+ ns.setPermission(path, roPerm);
+ return 0;
+ } catch (IOException ex) {
+ log.error("error recovering lease ", ex);
+ // lets do this again
+ return 1000;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/master/state/Assignment.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/Assignment.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/Assignment.java
new file mode 100644
index 0000000..b2510fc
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/Assignment.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.master.state;
+
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.server.master.state.TServerInstance;
+
+public class Assignment {
+ public KeyExtent tablet;
+ public TServerInstance server;
+
+ public Assignment(KeyExtent tablet, TServerInstance server) {
+ this.tablet = tablet;
+ this.server = server;
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/master/state/CurrentState.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/CurrentState.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/CurrentState.java
new file mode 100644
index 0000000..f4d98bf
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/CurrentState.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.master.state;
+
+import java.util.Collection;
+import java.util.Set;
+
+public interface CurrentState {
+
+ Set<String> onlineTables();
+
+ Set<TServerInstance> onlineTabletServers();
+
+ Collection<MergeInfo> merges();
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/master/state/DeadServerList.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/DeadServerList.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/DeadServerList.java
new file mode 100644
index 0000000..b2ea7d6
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/DeadServerList.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.master.state;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.accumulo.core.master.thrift.DeadServer;
+import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy;
+import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.data.Stat;
+
+public class DeadServerList {
+ private static final Logger log = Logger.getLogger(DeadServerList.class);
+ private final String path;
+
+ public DeadServerList(String path) {
+ this.path = path;
+ IZooReaderWriter zoo = ZooReaderWriter.getInstance();
+ try {
+ zoo.mkdirs(path);
+ } catch (Exception ex) {
+ log.error("Unable to make parent directories of " + path, ex);
+ }
+ }
+
+ public List<DeadServer> getList() {
+ List<DeadServer> result = new ArrayList<DeadServer>();
+ IZooReaderWriter zoo = ZooReaderWriter.getInstance();
+ try {
+ List<String> children = zoo.getChildren(path);
+ if (children != null) {
+ for (String child : children) {
+ Stat stat = new Stat();
+ byte[] data = zoo.getData(path + "/" + child, stat);
+ DeadServer server = new DeadServer(child, stat.getMtime(), new String(data));
+ result.add(server);
+ }
+ }
+ } catch (Exception ex) {
+ log.error(ex, ex);
+ }
+ return result;
+ }
+
+ public void delete(String server) {
+ IZooReaderWriter zoo = ZooReaderWriter.getInstance();
+ try {
+ zoo.recursiveDelete(path + "/" + server, NodeMissingPolicy.SKIP);
+ } catch (Exception ex) {
+ log.error(ex, ex);
+ }
+ }
+
+ public void post(String server, String cause) {
+ IZooReaderWriter zoo = ZooReaderWriter.getInstance();
+ try {
+ zoo.putPersistentData(path + "/" + server, cause.getBytes(), NodeExistsPolicy.SKIP);
+ } catch (Exception ex) {
+ log.error(ex, ex);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/master/state/DistributedStore.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/DistributedStore.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/DistributedStore.java
new file mode 100644
index 0000000..ad658df
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/DistributedStore.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.master.state;
+
+import java.util.List;
+
+/*
+ * An abstract version of ZooKeeper that we can write tests against.
+ */
+public interface DistributedStore {
+
+ public List<String> getChildren(String path) throws DistributedStoreException;
+
+ public byte[] get(String path) throws DistributedStoreException;
+
+ public void put(String path, byte[] bs) throws DistributedStoreException;
+
+ public void remove(String path) throws DistributedStoreException;
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/master/state/DistributedStoreException.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/DistributedStoreException.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/DistributedStoreException.java
new file mode 100644
index 0000000..3d3a725
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/DistributedStoreException.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.master.state;
+
+public class DistributedStoreException extends Exception {
+
+ private static final long serialVersionUID = 1L;
+
+ public DistributedStoreException(String why) {
+ super(why);
+ }
+
+ public DistributedStoreException(Exception cause) {
+ super(cause);
+ }
+
+ public DistributedStoreException(String why, Exception cause) {
+ super(why, cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/master/state/MergeInfo.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/MergeInfo.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/MergeInfo.java
new file mode 100644
index 0000000..708b1b7
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/MergeInfo.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.master.state;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Information about the current merge/rangeDelete.
+ *
+ * Writable to serialize for zookeeper and the Tablet
+ */
+public class MergeInfo implements Writable {
+
+ public enum Operation {
+ MERGE, DELETE,
+ }
+
+ MergeState state = MergeState.NONE;
+ KeyExtent extent;
+ Operation operation = Operation.MERGE;
+
+ public MergeInfo() {}
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ extent = new KeyExtent();
+ extent.readFields(in);
+ state = MergeState.values()[in.readInt()];
+ operation = Operation.values()[in.readInt()];
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ extent.write(out);
+ out.writeInt(state.ordinal());
+ out.writeInt(operation.ordinal());
+ }
+
+ public MergeInfo(KeyExtent range, Operation op) {
+ this.extent = range;
+ this.operation = op;
+ }
+
+ public MergeState getState() {
+ return state;
+ }
+
+ public KeyExtent getExtent() {
+ return extent;
+ }
+
+ public Operation getOperation() {
+ return operation;
+ }
+
+ public void setState(MergeState state) {
+ this.state = state;
+ }
+
+ public boolean isDelete() {
+ return this.operation.equals(Operation.DELETE);
+ }
+
+ public boolean needsToBeChopped(KeyExtent otherExtent) {
+ // During a delete, the block after the merge will be stretched to cover the deleted area.
+ // Therefore, it needs to be chopped
+ if (!otherExtent.getTableId().equals(extent.getTableId()))
+ return false;
+ if (isDelete())
+ return otherExtent.getPrevEndRow() != null && otherExtent.getPrevEndRow().equals(extent.getEndRow());
+ else
+ return this.extent.overlaps(otherExtent);
+ }
+
+ public boolean overlaps(KeyExtent otherExtent) {
+ boolean result = this.extent.overlaps(otherExtent);
+ if (!result && needsToBeChopped(otherExtent))
+ return true;
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ if (!state.equals(MergeState.NONE))
+ return "Merge " + operation.toString() + " of " + extent + " State: " + state;
+ return "No Merge in progress";
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/master/state/MergeState.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/MergeState.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/MergeState.java
new file mode 100644
index 0000000..29b6ae3
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/MergeState.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.master.state;
+
+public enum MergeState {
+ /**
+ * Not merging
+ */
+ NONE,
+ /**
+ * created, stored in zookeeper, other merges are prevented on the table
+ */
+ STARTED,
+ /**
+ * put all matching tablets online, split tablets if we are deleting
+ */
+ SPLITTING,
+ /**
+ * after the tablet server chops the file, it marks the metadata table with a chopped marker
+ */
+ WAITING_FOR_CHOPPED,
+ /**
+ * when the number of chopped tablets in the range matches the number of online tablets in the range, take the tablets offline
+ */
+ WAITING_FOR_OFFLINE,
+ /**
+ * when the number of chopped, offline tablets equals the number of merge tablets, begin the metadata updates
+ */
+ MERGING,
+ /**
+ * merge is complete, the resulting tablet can be brought online, remove the marker in zookeeper
+ */
+ COMPLETE;
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java
new file mode 100644
index 0000000..082f2ca
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.master.state;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
+import org.apache.accumulo.core.security.Credentials;
+import org.apache.accumulo.server.client.HdfsZooInstance;
+import org.apache.accumulo.server.master.state.CurrentState;
+import org.apache.accumulo.server.master.state.MetaDataTableScanner;
+import org.apache.accumulo.server.master.state.TabletLocationState;
+import org.apache.accumulo.server.security.SystemCredentials;
+import org.apache.hadoop.io.Text;
+
+public class MetaDataStateStore extends TabletStateStore {
+ // private static final Logger log = Logger.getLogger(MetaDataStateStore.class);
+
+ private static final int THREADS = 4;
+ private static final int LATENCY = 1000;
+ private static final int MAX_MEMORY = 200 * 1024 * 1024;
+
+ final protected Instance instance;
+ final protected CurrentState state;
+ final protected Credentials credentials;
+ final private String targetTableName;
+
+ protected MetaDataStateStore(Instance instance, Credentials credentials, CurrentState state, String targetTableName) {
+ this.instance = instance;
+ this.state = state;
+ this.credentials = credentials;
+ this.targetTableName = targetTableName;
+ }
+
+ public MetaDataStateStore(Instance instance, Credentials credentials, CurrentState state) {
+ this(instance, credentials, state, MetadataTable.NAME);
+ }
+
+ protected MetaDataStateStore(String tableName) {
+ this(HdfsZooInstance.getInstance(), SystemCredentials.get(), null, tableName);
+ }
+
+ public MetaDataStateStore() {
+ this(MetadataTable.NAME);
+ }
+
+ @Override
+ public Iterator<TabletLocationState> iterator() {
+ return new MetaDataTableScanner(instance, credentials, MetadataSchema.TabletsSection.getRange(), state);
+ }
+
+ @Override
+ public void setLocations(Collection<Assignment> assignments) throws DistributedStoreException {
+ BatchWriter writer = createBatchWriter();
+ try {
+ for (Assignment assignment : assignments) {
+ Mutation m = new Mutation(assignment.tablet.getMetadataEntry());
+ Text cq = assignment.server.asColumnQualifier();
+ m.put(TabletsSection.CurrentLocationColumnFamily.NAME, cq, assignment.server.asMutationValue());
+ m.putDelete(TabletsSection.FutureLocationColumnFamily.NAME, cq);
+ writer.addMutation(m);
+ }
+ } catch (Exception ex) {
+ throw new DistributedStoreException(ex);
+ } finally {
+ try {
+ writer.close();
+ } catch (MutationsRejectedException e) {
+ throw new DistributedStoreException(e);
+ }
+ }
+ }
+
+ BatchWriter createBatchWriter() {
+ try {
+ return instance.getConnector(credentials.getPrincipal(), credentials.getToken()).createBatchWriter(targetTableName,
+ new BatchWriterConfig().setMaxMemory(MAX_MEMORY).setMaxLatency(LATENCY, TimeUnit.MILLISECONDS).setMaxWriteThreads(THREADS));
+ } catch (TableNotFoundException e) {
+ // ya, I don't think so
+ throw new RuntimeException(e);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void setFutureLocations(Collection<Assignment> assignments) throws DistributedStoreException {
+ BatchWriter writer = createBatchWriter();
+ try {
+ for (Assignment assignment : assignments) {
+ Mutation m = new Mutation(assignment.tablet.getMetadataEntry());
+ m.put(TabletsSection.FutureLocationColumnFamily.NAME, assignment.server.asColumnQualifier(), assignment.server.asMutationValue());
+ writer.addMutation(m);
+ }
+ } catch (Exception ex) {
+ throw new DistributedStoreException(ex);
+ } finally {
+ try {
+ writer.close();
+ } catch (MutationsRejectedException e) {
+ throw new DistributedStoreException(e);
+ }
+ }
+ }
+
+ @Override
+ public void unassign(Collection<TabletLocationState> tablets) throws DistributedStoreException {
+
+ BatchWriter writer = createBatchWriter();
+ try {
+ for (TabletLocationState tls : tablets) {
+ Mutation m = new Mutation(tls.extent.getMetadataEntry());
+ if (tls.current != null) {
+ m.putDelete(TabletsSection.CurrentLocationColumnFamily.NAME, tls.current.asColumnQualifier());
+ }
+ if (tls.future != null) {
+ m.putDelete(TabletsSection.FutureLocationColumnFamily.NAME, tls.future.asColumnQualifier());
+ }
+ writer.addMutation(m);
+ }
+ } catch (Exception ex) {
+ throw new DistributedStoreException(ex);
+ } finally {
+ try {
+ writer.close();
+ } catch (MutationsRejectedException e) {
+ throw new DistributedStoreException(e);
+ }
+ }
+ }
+
+ @Override
+ public String name() {
+ return "Normal Tablets";
+ }
+}