You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ec...@apache.org on 2012/01/27 20:35:33 UTC
svn commit: r1236853 - in /incubator/accumulo/branches/1.4/src/server/src:
main/java/org/apache/accumulo/server/master/
main/java/org/apache/accumulo/server/master/state/
main/java/org/apache/accumulo/server/master/tableOps/
main/java/org/apache/accumu...
Author: ecn
Date: Fri Jan 27 19:35:32 2012
New Revision: 1236853
URL: http://svn.apache.org/viewvc?rev=1236853&view=rev
Log:
ACCUMULO-315: add second consistency check prior to performing any merge operation
- added code to do a consistent scan of the metadata
- re-organized the MetaDataStateStore to work in a unit test
- write a unit test against the case discovered in ACCUMULO-315
Added:
incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/state/MergeStats.java (with props)
incubator/accumulo/branches/1.4/src/server/src/test/java/org/apache/accumulo/server/master/TestMergeState.java (with props)
Modified:
incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/Master.java
incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java
incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/state/MetaDataTableScanner.java
incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/state/RootTabletStateStore.java
incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/state/TabletStateChangeIterator.java
incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/state/TabletStateStore.java
incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/tableOps/DeleteTable.java
incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/tserverOps/FlushTablets.java
incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/monitor/servlets/TablesServlet.java
incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/test/performance/thrift/NullTserver.java
incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/test/randomwalk/Module.java
incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/test/randomwalk/security/TableOp.java
incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/util/AddressUtil.java
Modified: incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/Master.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/Master.java?rev=1236853&r1=1236852&r2=1236853&view=diff
==============================================================================
--- incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/Master.java (original)
+++ incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/Master.java Fri Jan 27 19:35:32 2012
@@ -110,6 +110,7 @@ import org.apache.accumulo.server.master
import org.apache.accumulo.server.master.state.DistributedStoreException;
import org.apache.accumulo.server.master.state.MergeInfo;
import org.apache.accumulo.server.master.state.MergeState;
+import org.apache.accumulo.server.master.state.MergeStats;
import org.apache.accumulo.server.master.state.MetaDataStateStore;
import org.apache.accumulo.server.master.state.RootTabletStateStore;
import org.apache.accumulo.server.master.state.TServerInstance;
@@ -196,52 +197,6 @@ public class Master implements LiveTServ
final private static int MAX_TSERVER_WORK_CHUNK = 5000;
final private static int MAX_BAD_STATUS_COUNT = 3;
- static class MergeStats {
- MergeInfo info;
- int hosted = 0;
- int unassigned = 0;
- int chopped = 0;
- int needsToBeChopped = 0;
- int total = 0;
- boolean lowerSplit = false;
- boolean upperSplit = false;
-
- public MergeStats(MergeInfo info) {
- this.info = info;
- if (info.getState().equals(MergeState.NONE))
- return;
- if (info.getRange().getEndRow() == null)
- upperSplit = true;
- if (info.getRange().getPrevEndRow() == null)
- lowerSplit = true;
- }
-
- void update(KeyExtent ke, TabletState state, boolean chopped) {
- if (info.getState().equals(MergeState.NONE))
- return;
- if (!upperSplit && info.getRange().getEndRow().equals(ke.getPrevEndRow())) {
- log.info("Upper split found");
- upperSplit = true;
- }
- if (!lowerSplit && info.getRange().getPrevEndRow().equals(ke.getEndRow())) {
- log.info("Lower split found");
- lowerSplit = true;
- }
- if (!info.overlaps(ke))
- return;
- if (info.needsToBeChopped(ke)) {
- this.needsToBeChopped++;
- if (chopped)
- this.chopped++;
- }
- this.total++;
- if (state.equals(TabletState.HOSTED))
- this.hosted++;
- if (state.equals(TabletState.UNASSIGNED))
- this.unassigned++;
- }
- }
-
final private Instance instance;
final private String hostname;
final private FileSystem fs;
@@ -1366,13 +1321,13 @@ public class Master implements LiveTServ
if (mergeStats == null) {
mergeStatsCache.put(tableId, mergeStats = new MergeStats(getMergeInfo(tableId)));
}
- TabletGoalState goal = getGoalState(tls, mergeStats.info);
+ TabletGoalState goal = getGoalState(tls, mergeStats.getMergeInfo());
TServerInstance server = tls.getServer();
TabletState state = tls.getState(currentTServers.keySet());
stats.update(tableId, state);
mergeStats.update(tls.extent, state, tls.chopped);
- sendChopRequest(mergeStats.info, state, tls);
- sendSplitRequest(mergeStats.info, state, tls);
+ sendChopRequest(mergeStats.getMergeInfo(), state, tls);
+ sendSplitRequest(mergeStats.getMergeInfo(), state, tls);
// Always follow through with assignments
if (state == TabletState.ASSIGNED) {
@@ -1529,84 +1484,31 @@ public class Master implements LiveTServ
private void updateMergeState(Map<Text,MergeStats> mergeStatsCache) {
for (MergeStats stats : mergeStatsCache.values()) {
- MergeState state = stats.info.getState();
try {
- if (state == MergeState.STARTED) {
- setMergeState(stats.info, state = MergeState.SPLITTING);
- }
- if (state == MergeState.SPLITTING) {
- log.info(stats.hosted + " are hosted, total " + stats.total);
- if (!stats.info.isDelete() && stats.total == 1) {
- log.info("Merge range is already contained in a single tablet");
- setMergeState(stats.info, state = MergeState.COMPLETE);
- } else if (stats.hosted == stats.total) {
- if (stats.info.isDelete()) {
- if (!stats.lowerSplit)
- log.info("Waiting for " + stats.info + " lower split to occur");
- else if (!stats.upperSplit)
- log.info("Waiting for " + stats.info + " upper split to occur");
- else
- setMergeState(stats.info, state = MergeState.WAITING_FOR_CHOPPED);
- } else {
- setMergeState(stats.info, state = MergeState.WAITING_FOR_CHOPPED);
- }
- } else {
- log.info("Waiting for " + stats.hosted + " hosted tablets to be " + stats.total);
- }
- }
- if (state == MergeState.WAITING_FOR_CHOPPED) {
- log.info(stats.chopped + " tablets are chopped");
- if (stats.chopped == stats.needsToBeChopped) {
- setMergeState(stats.info, state = MergeState.WAITING_FOR_OFFLINE);
- } else {
- log.info("Waiting for " + stats.chopped + " chopped tablets to be " + stats.needsToBeChopped);
- }
- }
- if (state == MergeState.WAITING_FOR_OFFLINE) {
- if (stats.chopped != stats.needsToBeChopped) {
- log.warn("Unexpected state: chopped tablets should be " + stats.needsToBeChopped + " was " + stats.chopped + " merge " + stats.info.getRange());
- // Perhaps a split occurred after we chopped, but before we went offline: start over
- setMergeState(stats.info, state = MergeState.SPLITTING);
- } else {
- log.info(stats.chopped + " tablets are chopped, " + stats.unassigned + " are offline");
- if (stats.unassigned == stats.total && stats.chopped == stats.needsToBeChopped) {
- setMergeState(stats.info, state = MergeState.MERGING);
- } else {
- log.info("Waiting for " + stats.unassigned + " unassigned tablets to be " + stats.total);
- }
- }
- }
- if (state == MergeState.MERGING) {
- if (stats.hosted != 0) {
- // Shouldn't happen
- log.error("Unexpected state: hosted tablets should be zero " + stats.hosted + " merge " + stats.info.getRange());
- }
- if (stats.unassigned != stats.total) {
- // Shouldn't happen
- log.error("Unexpected state: unassigned tablets should be " + stats.total + " was " + stats.unassigned + " merge " + stats.info.getRange());
- }
- log.info(stats.unassigned + " tablets are unassigned");
- if (stats.hosted == 0 && stats.unassigned == stats.total) {
- try {
- if (stats.info.isDelete())
- deleteTablets(stats.info);
- else
- mergeMetadataRecords(stats.info);
- setMergeState(stats.info, state = MergeState.COMPLETE);
- } catch (Exception ex) {
- log.error("Unable merge metadata table records", ex);
+ MergeState update = stats.nextMergeState();
+ if (update != stats.getMergeInfo().getState()) {
+ if (update == MergeState.MERGING) {
+ if (stats.verifyMergeConsistency(getConnector(), Master.this)) {
+ try {
+ if (stats.getMergeInfo().isDelete())
+ deleteTablets(stats.getMergeInfo());
+ else
+ mergeMetadataRecords(stats.getMergeInfo());
+ setMergeState(stats.getMergeInfo(), MergeState.COMPLETE);
+ update = MergeState.NONE;
+ } catch (Exception ex) {
+ log.error("Unable merge metadata table records", ex);
+ }
}
}
- }
- if (state == MergeState.COMPLETE) {
- setMergeState(stats.info, MergeState.NONE);
+ setMergeState(stats.getMergeInfo(), update);
}
} catch (Exception ex) {
- log.error("Unable to update merge state for merge " + stats.info.getRange(), ex);
+ log.error("Unable to update merge state for merge " + stats.getMergeInfo().getRange(), ex);
}
}
}
-
+
private void deleteTablets(MergeInfo info) throws AccumuloException {
KeyExtent range = info.getRange();
log.debug("Deleting tablets for " + range);
@@ -1858,6 +1760,7 @@ public class Master implements LiveTServ
}
+
private class MigrationCleanupThread extends Daemon {
public void run() {
@@ -2113,7 +2016,9 @@ public class Master implements LiveTServ
tserverSet.startListeningForTabletServerChanges();
- final TabletStateStore stores[] = {new ZooTabletStateStore(new ZooStore(zroot)), new RootTabletStateStore(this), new MetaDataStateStore(this)};
+ AuthInfo systemAuths = SecurityConstants.getSystemCredentials();
+ final TabletStateStore stores[] = {new ZooTabletStateStore(new ZooStore(zroot)), new RootTabletStateStore(instance, systemAuths, this),
+ new MetaDataStateStore(instance, systemAuths, this)};
for (int i = 0; i < stores.length; i++) {
watchers.add(new TabletGroupWatcher(stores[i]));
}
Added: incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/state/MergeStats.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/state/MergeStats.java?rev=1236853&view=auto
==============================================================================
--- incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/state/MergeStats.java (added)
+++ incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/state/MergeStats.java Fri Jan 27 19:35:32 2012
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.master.state;
+
+import java.io.IOException;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.hadoop.io.Text;
+import org.apache.log4j.Logger;
+
+public class MergeStats {
+ final static private Logger log = Logger.getLogger(MergeStats.class);
+ MergeInfo info;
+ int hosted = 0;
+ int unassigned = 0;
+ int chopped = 0;
+ int needsToBeChopped = 0;
+ int total = 0;
+ boolean lowerSplit = false;
+ boolean upperSplit = false;
+
+ public MergeStats(MergeInfo info) {
+ this.info = info;
+ if (info.getState().equals(MergeState.NONE))
+ return;
+ if (info.getRange().getEndRow() == null)
+ upperSplit = true;
+ if (info.getRange().getPrevEndRow() == null)
+ lowerSplit = true;
+ }
+
+ public MergeInfo getMergeInfo() {
+ return info;
+ }
+
+ public void update(KeyExtent ke, TabletState state, boolean chopped) {
+ if (info.getState().equals(MergeState.NONE))
+ return;
+ if (!upperSplit && info.getRange().getEndRow().equals(ke.getPrevEndRow())) {
+ log.info("Upper split found");
+ upperSplit = true;
+ }
+ if (!lowerSplit && info.getRange().getPrevEndRow().equals(ke.getEndRow())) {
+ log.info("Lower split found");
+ lowerSplit = true;
+ }
+ if (!info.overlaps(ke))
+ return;
+ if (info.needsToBeChopped(ke)) {
+ this.needsToBeChopped++;
+ if (chopped)
+ this.chopped++;
+ }
+ this.total++;
+ if (state.equals(TabletState.HOSTED))
+ this.hosted++;
+ if (state.equals(TabletState.UNASSIGNED))
+ this.unassigned++;
+ }
+
+ public MergeState nextMergeState() throws Exception {
+ MergeState state = info.getState();
+ if (state == MergeState.STARTED) {
+ state = MergeState.SPLITTING;
+ }
+ if (state == MergeState.SPLITTING) {
+ log.info(hosted + " are hosted, total " + total);
+ if (!info.isDelete() && total == 1) {
+ log.info("Merge range is already contained in a single tablet");
+ state = MergeState.COMPLETE;
+ } else if (hosted == total) {
+ if (info.isDelete()) {
+ if (!lowerSplit)
+ log.info("Waiting for " + info + " lower split to occur");
+ else if (!upperSplit)
+ log.info("Waiting for " + info + " upper split to occur");
+ else
+ state = MergeState.WAITING_FOR_CHOPPED;
+ } else {
+ state = MergeState.WAITING_FOR_CHOPPED;
+ }
+ } else {
+ log.info("Waiting for " + hosted + " hosted tablets to be " + total);
+ }
+ }
+ if (state == MergeState.WAITING_FOR_CHOPPED) {
+ log.info(chopped + " tablets are chopped");
+ if (chopped == needsToBeChopped) {
+ state = MergeState.WAITING_FOR_OFFLINE;
+ } else {
+ log.info("Waiting for " + chopped + " chopped tablets to be " + needsToBeChopped);
+ }
+ }
+ if (state == MergeState.WAITING_FOR_OFFLINE) {
+ if (chopped != needsToBeChopped) {
+ log.warn("Unexpected state: chopped tablets should be " + needsToBeChopped + " was " + chopped + " merge " + info.getRange());
+ // Perhaps a split occurred after we chopped, but before we went offline: start over
+ state = MergeState.WAITING_FOR_CHOPPED;
+ } else {
+ log.info(chopped + " tablets are chopped, " + unassigned + " are offline");
+ if (unassigned == total && chopped == needsToBeChopped) {
+ state = MergeState.MERGING;
+ } else {
+ log.info("Waiting for " + unassigned + " unassigned tablets to be " + total);
+ }
+ }
+ }
+ if (state == MergeState.MERGING) {
+ if (hosted != 0) {
+ // Shouldn't happen
+ log.error("Unexpected state: hosted tablets should be zero " + hosted + " merge " + info.getRange());
+ state = MergeState.WAITING_FOR_OFFLINE;
+ }
+ if (unassigned != total) {
+ // Shouldn't happen
+ log.error("Unexpected state: unassigned tablets should be " + total + " was " + unassigned + " merge " + info.getRange());
+ state = MergeState.WAITING_FOR_CHOPPED;
+ }
+ log.info(unassigned + " tablets are unassigned");
+ }
+ return state;
+ }
+
+ public boolean verifyMergeConsistency(Connector connector, CurrentState master) throws TableNotFoundException, IOException {
+ MergeStats verify = new MergeStats(info);
+ Scanner scanner = connector.createScanner(Constants.METADATA_TABLE_NAME, Constants.NO_AUTHS);
+ MetaDataTableScanner.configureScanner(scanner, master);
+ KeyExtent extent = info.getRange();
+ Text start = extent.getPrevEndRow();
+ if (start == null) {
+ start = new Text();
+ }
+ Text tableId = extent.getTableId();
+ Text first = KeyExtent.getMetadataEntry(tableId, start);
+ Range range = new Range(first, true, null, true);
+ scanner.setRange(range);
+ Text pr = null;
+ for (Entry<Key,Value> entry : scanner) {
+ TabletLocationState tls = MetaDataTableScanner.createTabletLocationState(entry.getKey(), entry.getValue());
+ verify.update(tls.extent, tls.getState(master.onlineTabletServers()), tls.chopped);
+ if (pr != null && !tls.extent.getPrevEndRow().equals(pr))
+ return false;
+ pr = tls.extent.getEndRow();
+ if (tls.extent.getPrevEndRow().compareTo(extent.getEndRow()) > 0) {
+ break;
+ }
+ }
+ return chopped == verify.chopped && unassigned == verify.unassigned && unassigned == verify.total;
+ }
+}
\ No newline at end of file
Propchange: incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/state/MergeStats.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified: incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java?rev=1236853&r1=1236852&r2=1236853&view=diff
==============================================================================
--- incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java (original)
+++ incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java Fri Jan 27 19:35:32 2012
@@ -21,9 +21,11 @@ import java.util.Iterator;
import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.Instance;
import org.apache.accumulo.core.client.MutationsRejectedException;
-import org.apache.accumulo.core.client.impl.BatchWriterImpl;
+import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.security.thrift.AuthInfo;
import org.apache.accumulo.server.client.HdfsZooInstance;
import org.apache.accumulo.server.security.SecurityConstants;
import org.apache.hadoop.io.Text;
@@ -35,15 +37,23 @@ public class MetaDataStateStore extends
private static final int LATENCY = 1000;
private static final int MAX_MEMORY = 200 * 1024 * 1024;
+ final protected Instance instance;
final protected CurrentState state;
+ final protected AuthInfo auths;
- public MetaDataStateStore(CurrentState state) {
+ public MetaDataStateStore(Instance instance, AuthInfo auths, CurrentState state) {
+ this.instance = instance;
this.state = state;
+ this.auths = auths;
}
+ public MetaDataStateStore() {
+ this(HdfsZooInstance.getInstance(), SecurityConstants.getSystemCredentials(), null);
+ }
+
@Override
public Iterator<TabletLocationState> iterator() {
- return new MetaDataTableScanner(Constants.NON_ROOT_METADATA_KEYSPACE, state);
+ return new MetaDataTableScanner(instance, auths, Constants.NON_ROOT_METADATA_KEYSPACE, state);
}
@Override
@@ -69,9 +79,14 @@ public class MetaDataStateStore extends
}
BatchWriter createBatchWriter() {
- BatchWriter writer = new BatchWriterImpl(HdfsZooInstance.getInstance(), SecurityConstants.getSystemCredentials(), Constants.METADATA_TABLE_ID, MAX_MEMORY,
- LATENCY, THREADS);
- return writer;
+ try {
+ return instance.getConnector(auths).createBatchWriter(Constants.METADATA_TABLE_NAME, MAX_MEMORY, LATENCY, THREADS);
+ } catch (TableNotFoundException e) {
+ // ya, I don't think so
+ throw new RuntimeException(e);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
}
@Override
Modified: incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/state/MetaDataTableScanner.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/state/MetaDataTableScanner.java?rev=1236853&r1=1236852&r2=1236853&view=diff
==============================================================================
--- incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/state/MetaDataTableScanner.java (original)
+++ incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/state/MetaDataTableScanner.java Fri Jan 27 19:35:32 2012
@@ -32,15 +32,16 @@ import java.util.SortedMap;
import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.BatchScanner;
import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.ScannerBase;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.KeyExtent;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.iterators.user.WholeRowIterator;
+import org.apache.accumulo.core.security.thrift.AuthInfo;
import org.apache.accumulo.core.util.ColumnFQ;
-import org.apache.accumulo.server.client.HdfsZooInstance;
-import org.apache.accumulo.server.security.SecurityConstants;
import org.apache.hadoop.io.Text;
import org.apache.log4j.Logger;
@@ -50,33 +51,37 @@ public class MetaDataTableScanner implem
BatchScanner mdScanner;
Iterator<Entry<Key,Value>> iter;
- public MetaDataTableScanner(Range range, CurrentState state) {
+ public MetaDataTableScanner(Instance instance, AuthInfo auths, Range range, CurrentState state) {
// scan over metadata table, looking for tablets in the wrong state based on the live servers and online tables
try {
- Connector connector = HdfsZooInstance.getInstance().getConnector(SecurityConstants.getSystemCredentials());
+ Connector connector = instance.getConnector(auths);
mdScanner = connector.createBatchScanner(Constants.METADATA_TABLE_NAME, Constants.NO_AUTHS, 8);
- ColumnFQ.fetch(mdScanner, Constants.METADATA_PREV_ROW_COLUMN);
- mdScanner.fetchColumnFamily(Constants.METADATA_CURRENT_LOCATION_COLUMN_FAMILY);
- mdScanner.fetchColumnFamily(Constants.METADATA_FUTURE_LOCATION_COLUMN_FAMILY);
- mdScanner.fetchColumnFamily(Constants.METADATA_LOG_COLUMN_FAMILY);
- mdScanner.fetchColumnFamily(Constants.METADATA_CHOPPED_COLUMN_FAMILY);
+ configureScanner(mdScanner, state);
mdScanner.setRanges(Collections.singletonList(range));
- mdScanner.addScanIterator(new IteratorSetting(1000, "wholeRows", WholeRowIterator.class));
- IteratorSetting tabletChange = new IteratorSetting(1001, "tabletChange", TabletStateChangeIterator.class);
- if (state != null) {
- TabletStateChangeIterator.setCurrentServers(tabletChange, state.onlineTabletServers());
- TabletStateChangeIterator.setOnlineTables(tabletChange, state.onlineTables());
- TabletStateChangeIterator.setMerges(tabletChange, state.merges());
- }
- mdScanner.addScanIterator(tabletChange);
iter = mdScanner.iterator();
} catch (Exception ex) {
throw new RuntimeException(ex);
}
}
+
+ static public void configureScanner(ScannerBase scanner, CurrentState state) {
+ ColumnFQ.fetch(scanner, Constants.METADATA_PREV_ROW_COLUMN);
+ scanner.fetchColumnFamily(Constants.METADATA_CURRENT_LOCATION_COLUMN_FAMILY);
+ scanner.fetchColumnFamily(Constants.METADATA_FUTURE_LOCATION_COLUMN_FAMILY);
+ scanner.fetchColumnFamily(Constants.METADATA_LOG_COLUMN_FAMILY);
+ scanner.fetchColumnFamily(Constants.METADATA_CHOPPED_COLUMN_FAMILY);
+ scanner.addScanIterator(new IteratorSetting(1000, "wholeRows", WholeRowIterator.class));
+ IteratorSetting tabletChange = new IteratorSetting(1001, "tabletChange", TabletStateChangeIterator.class);
+ if (state != null) {
+ TabletStateChangeIterator.setCurrentServers(tabletChange, state.onlineTabletServers());
+ TabletStateChangeIterator.setOnlineTables(tabletChange, state.onlineTables());
+ TabletStateChangeIterator.setMerges(tabletChange, state.merges());
+ }
+ scanner.addScanIterator(tabletChange);
+ }
- public MetaDataTableScanner(Range range) {
- this(range, null);
+ public MetaDataTableScanner(Instance instance, AuthInfo auths, Range range) {
+ this(instance, auths, range, null);
}
public void close() {
@@ -115,7 +120,8 @@ public class MetaDataTableScanner implem
}
}
- public static TabletLocationState createTabletLocationState(SortedMap<Key,Value> decodedRow) {
+ public static TabletLocationState createTabletLocationState(Key k, Value v) throws IOException {
+ final SortedMap<Key,Value> decodedRow = WholeRowIterator.decodeRow(k, v);
KeyExtent extent = null;
TServerInstance future = null;
TServerInstance current = null;
@@ -152,10 +158,9 @@ public class MetaDataTableScanner implem
}
private TabletLocationState fetch() {
- Entry<Key,Value> entry = iter.next();
try {
- final SortedMap<Key,Value> decodedRow = WholeRowIterator.decodeRow(entry.getKey(), entry.getValue());
- return createTabletLocationState(decodedRow);
+ Entry<Key,Value> e = iter.next();
+ return createTabletLocationState(e.getKey(), e.getValue());
} catch (IOException ex) {
throw new RuntimeException(ex);
}
Modified: incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/state/RootTabletStateStore.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/state/RootTabletStateStore.java?rev=1236853&r1=1236852&r2=1236853&view=diff
==============================================================================
--- incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/state/RootTabletStateStore.java (original)
+++ incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/state/RootTabletStateStore.java Fri Jan 27 19:35:32 2012
@@ -19,21 +19,23 @@ package org.apache.accumulo.server.maste
import java.util.Iterator;
import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.Instance;
import org.apache.accumulo.core.data.KeyExtent;
import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.security.thrift.AuthInfo;
import org.apache.hadoop.io.Text;
public class RootTabletStateStore extends MetaDataStateStore {
- public RootTabletStateStore(CurrentState state) {
- super(state);
+ public RootTabletStateStore(Instance instance, AuthInfo auths, CurrentState state) {
+ super(instance, auths, state);
}
@Override
public Iterator<TabletLocationState> iterator() {
Range range = new Range(Constants.ROOT_TABLET_EXTENT.getMetadataEntry(), false, KeyExtent.getMetadataEntry(new Text(Constants.METADATA_TABLE_ID), null),
true);
- return new MetaDataTableScanner(range, state);
+ return new MetaDataTableScanner(instance, auths, range, state);
}
@Override
Modified: incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/state/TabletStateChangeIterator.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/state/TabletStateChangeIterator.java?rev=1236853&r1=1236852&r2=1236853&view=diff
==============================================================================
--- incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/state/TabletStateChangeIterator.java (original)
+++ incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/state/TabletStateChangeIterator.java Fri Jan 27 19:35:32 2012
@@ -118,7 +118,7 @@ public class TabletStateChangeIterator e
return;
SortedMap<Key,Value> decodedRow = WholeRowIterator.decodeRow(k, v);
- TabletLocationState tls = MetaDataTableScanner.createTabletLocationState(decodedRow);
+ TabletLocationState tls = MetaDataTableScanner.createTabletLocationState(k, v);
// we always want data about merges
MergeInfo merge = merges.get(tls.extent.getTableId());
if (merge != null && merge.getRange() != null && merge.getRange().overlaps(tls.extent)) {
Modified: incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/state/TabletStateStore.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/state/TabletStateStore.java?rev=1236853&r1=1236852&r2=1236853&view=diff
==============================================================================
--- incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/state/TabletStateStore.java (original)
+++ incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/state/TabletStateStore.java Fri Jan 27 19:35:32 2012
@@ -77,7 +77,7 @@ public abstract class TabletStateStore i
if (tls.extent.equals(Constants.ROOT_TABLET_EXTENT)) {
store = new ZooTabletStateStore();
} else {
- store = new MetaDataStateStore(null);
+ store = new MetaDataStateStore();
}
store.unassign(Collections.singletonList(tls));
}
@@ -87,7 +87,7 @@ public abstract class TabletStateStore i
if (assignment.tablet.equals(Constants.ROOT_TABLET_EXTENT)) {
store = new ZooTabletStateStore();
} else {
- store = new MetaDataStateStore(null);
+ store = new MetaDataStateStore();
}
store.setLocations(Collections.singletonList(assignment));
}
Modified: incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/tableOps/DeleteTable.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/tableOps/DeleteTable.java?rev=1236853&r1=1236852&r2=1236853&view=diff
==============================================================================
--- incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/tableOps/DeleteTable.java (original)
+++ incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/tableOps/DeleteTable.java Fri Jan 27 19:35:32 2012
@@ -89,7 +89,7 @@ class CleanUp extends MasterRepo {
boolean done = true;
Range tableRange = new KeyExtent(new Text(tableId), null, null).toMetadataRange();
- MetaDataTableScanner metaDataTableScanner = new MetaDataTableScanner(tableRange, null);
+ MetaDataTableScanner metaDataTableScanner = new MetaDataTableScanner(environment.getInstance(), SecurityConstants.getSystemCredentials(), tableRange, null);
try {
while (metaDataTableScanner.hasNext()) {
TabletLocationState locationState = metaDataTableScanner.next();
Modified: incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/tserverOps/FlushTablets.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/tserverOps/FlushTablets.java?rev=1236853&r1=1236852&r2=1236853&view=diff
==============================================================================
--- incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/tserverOps/FlushTablets.java (original)
+++ incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/tserverOps/FlushTablets.java Fri Jan 27 19:35:32 2012
@@ -19,9 +19,9 @@ package org.apache.accumulo.server.maste
import java.util.Collection;
import org.apache.accumulo.server.fate.Repo;
-import org.apache.accumulo.server.master.Master;
import org.apache.accumulo.server.master.EventCoordinator.Listener;
import org.apache.accumulo.server.master.LiveTServerSet.TServerConnection;
+import org.apache.accumulo.server.master.Master;
import org.apache.accumulo.server.master.state.DistributedStoreException;
import org.apache.accumulo.server.master.state.MetaDataStateStore;
import org.apache.accumulo.server.master.state.TabletLocationState;
@@ -58,7 +58,7 @@ public class FlushTablets extends Master
} catch (DistributedStoreException e) {
log.warn("Unable to open ZooTabletStateStore, will retry", e);
}
- MetaDataStateStore theRest = new MetaDataStateStore(null);
+ MetaDataStateStore theRest = new MetaDataStateStore();
for (TabletStateStore store : new TabletStateStore[] {zooTabletStateStore, theRest}) {
if (store != null) {
for (TabletLocationState tabletState : store) {
Modified: incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/monitor/servlets/TablesServlet.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/monitor/servlets/TablesServlet.java?rev=1236853&r1=1236852&r2=1236853&view=diff
==============================================================================
--- incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/monitor/servlets/TablesServlet.java (original)
+++ incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/monitor/servlets/TablesServlet.java Fri Jan 27 19:35:32 2012
@@ -27,6 +27,7 @@ import java.util.TreeSet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
+import org.apache.accumulo.core.client.Instance;
import org.apache.accumulo.core.client.impl.Tables;
import org.apache.accumulo.core.data.KeyExtent;
import org.apache.accumulo.core.data.Range;
@@ -44,6 +45,7 @@ import org.apache.accumulo.server.monito
import org.apache.accumulo.server.monitor.util.celltypes.NumberType;
import org.apache.accumulo.server.monitor.util.celltypes.TableLinkType;
import org.apache.accumulo.server.monitor.util.celltypes.TableStateType;
+import org.apache.accumulo.server.security.SecurityConstants;
import org.apache.hadoop.io.Text;
public class TablesServlet extends BasicServlet {
@@ -138,8 +140,10 @@ public class TablesServlet extends Basic
private void doTableDetails(HttpServletRequest req, StringBuilder sb, Map<String,String> tidToNameMap, String tableId) {
String displayName = Tables.getPrintableTableNameFromId(tidToNameMap, tableId);
-
- MetaDataTableScanner scanner = new MetaDataTableScanner(new Range(KeyExtent.getMetadataEntry(new Text(tableId), new Text()), KeyExtent.getMetadataEntry(
+ Instance instance = HdfsZooInstance.getInstance();
+ MetaDataTableScanner scanner = new MetaDataTableScanner(instance, SecurityConstants.getSystemCredentials(), new Range(KeyExtent.getMetadataEntry(new Text(
+ tableId), new Text()),
+ KeyExtent.getMetadataEntry(
new Text(tableId), null)));
TreeSet<String> locs = new TreeSet<String>();
Modified: incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/test/performance/thrift/NullTserver.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/test/performance/thrift/NullTserver.java?rev=1236853&r1=1236852&r2=1236853&view=diff
==============================================================================
--- incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/test/performance/thrift/NullTserver.java (original)
+++ incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/test/performance/thrift/NullTserver.java Fri Jan 27 19:35:32 2012
@@ -54,6 +54,7 @@ import org.apache.accumulo.server.master
import org.apache.accumulo.server.master.state.MetaDataTableScanner;
import org.apache.accumulo.server.master.state.TServerInstance;
import org.apache.accumulo.server.master.state.TabletLocationState;
+import org.apache.accumulo.server.security.SecurityConstants;
import org.apache.accumulo.server.util.TServerUtils;
import org.apache.accumulo.server.zookeeper.TransactionWatcher;
import org.apache.hadoop.io.Text;
@@ -207,7 +208,7 @@ public class NullTserver {
// read the locations for the table
Range tableRange = new KeyExtent(new Text(tableId), null, null).toMetadataRange();
- MetaDataTableScanner s = new MetaDataTableScanner(tableRange);
+ MetaDataTableScanner s = new MetaDataTableScanner(zki, SecurityConstants.getSystemCredentials(), tableRange);
long randomSessionID = port;
TServerInstance instance = new TServerInstance(addr, randomSessionID);
List<Assignment> assignments = new ArrayList<Assignment>();
@@ -217,7 +218,7 @@ public class NullTserver {
}
s.close();
// point them to this server
- MetaDataStateStore store = new MetaDataStateStore(null);
+ MetaDataStateStore store = new MetaDataStateStore();
store.setLocations(assignments);
while (true) {
Modified: incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/test/randomwalk/Module.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/test/randomwalk/Module.java?rev=1236853&r1=1236852&r2=1236853&view=diff
==============================================================================
--- incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/test/randomwalk/Module.java (original)
+++ incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/test/randomwalk/Module.java Fri Jan 27 19:35:32 2012
@@ -20,6 +20,7 @@ import java.io.File;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
+import java.util.Map.Entry;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
@@ -240,16 +241,21 @@ public class Module extends Node {
nextNodeId = ((Alias) nextNode).getTargetId();
nextNode = ((Alias) nextNode).get();
}
+ Properties nodeProps = getProps(nextNodeId);
try {
test = false;
if (nextNode instanceof Test) {
startTimer(nextNode.toString());
test = true;
}
- nextNode.visit(state, getProps(nextNodeId));
+ nextNode.visit(state, nodeProps);
if (test)
stopTimer(nextNode.toString());
} catch (Exception e) {
+ log.debug("Properties for node: " + nextNodeId);
+ for (Entry<Object,Object> entry : nodeProps.entrySet()) {
+ log.debug(" " + entry.getKey() + ": " + entry.getValue());
+ }
throw new Exception("Error running node " + nextNodeId, e);
}
state.visitedNode();
Modified: incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/test/randomwalk/security/TableOp.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/test/randomwalk/security/TableOp.java?rev=1236853&r1=1236852&r2=1236853&view=diff
==============================================================================
--- incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/test/randomwalk/security/TableOp.java (original)
+++ incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/test/randomwalk/security/TableOp.java Fri Jan 27 19:35:32 2012
@@ -92,10 +92,10 @@ public class TableOp extends Test {
Key k = entry.getKey();
seen++;
if (!auths.contains(k.getColumnVisibilityData()))
- throw new AccumuloException("Got data I should not be capable of seeing");
+ throw new AccumuloException("Got data I should not be capable of seeing: " + k + " table " + tableName);
}
if (!canRead)
- throw new AccumuloException("Was able to read when I shouldn't have had the perm with connection user " + conn.whoami());
+ throw new AccumuloException("Was able to read when I shouldn't have had the perm with connection user " + conn.whoami() + " table " + tableName);
for (Entry<String,Integer> entry : SecurityHelper.getAuthsMap(state).entrySet()) {
if (auths.contains(entry.getKey().getBytes()))
seen = seen - entry.getValue();
@@ -104,12 +104,12 @@ public class TableOp extends Test {
throw new AccumuloException("Got mismatched amounts of data");
} catch (TableNotFoundException tnfe) {
if (tableExists)
- throw new AccumuloException("Accumulo and test suite out of sync", tnfe);
+ throw new AccumuloException("Accumulo and test suite out of sync: table " + tableName, tnfe);
return;
} catch (AccumuloSecurityException ae) {
if (ae.getErrorCode().equals(SecurityErrorCode.PERMISSION_DENIED)) {
if (canRead)
- throw new AccumuloException("Table read permission out of sync with Accumulo", ae);
+ throw new AccumuloException("Table read permission out of sync with Accumulo: table " + tableName, ae);
else
return;
}
@@ -118,7 +118,7 @@ public class TableOp extends Test {
if (re.getCause() instanceof AccumuloSecurityException
&& ((AccumuloSecurityException) re.getCause()).getErrorCode().equals(SecurityErrorCode.PERMISSION_DENIED)) {
if (canRead)
- throw new AccumuloException("Table read permission out of sync with Accumulo", re.getCause());
+ throw new AccumuloException("Table read permission out of sync with Accumulo: table " + tableName, re.getCause());
else
return;
}
@@ -137,7 +137,7 @@ public class TableOp extends Test {
writer = conn.createBatchWriter(tableName, 9000l, 0l, 1);
} catch (TableNotFoundException tnfe) {
if (tableExists)
- throw new AccumuloException("Table didn't exist when it should have");
+ throw new AccumuloException("Table didn't exist when it should have: " + tableName);
return;
}
boolean works = true;
@@ -171,12 +171,12 @@ public class TableOp extends Test {
conn.tableOperations().importDirectory(tableName, dir.toString(), fail.toString(), true);
} catch (TableNotFoundException tnfe) {
if (tableExists)
- throw new AccumuloException("Table didn't exist when it should have");
+ throw new AccumuloException("Table didn't exist when it should have: " + tableName);
return;
} catch (AccumuloSecurityException ae) {
if (ae.getErrorCode().equals(SecurityErrorCode.PERMISSION_DENIED)) {
if (hasPerm)
- throw new AccumuloException("Bulk Import failed when it should have worked.");
+ throw new AccumuloException("Bulk Import failed when it should have worked: " + tableName);
return;
}
throw new AccumuloException("Unexpected exception!", ae);
@@ -185,7 +185,7 @@ public class TableOp extends Test {
SecurityHelper.increaseAuthMap(state, s, 1);
if (!hasPerm)
- throw new AccumuloException("Bulk Import succeeded when it should have failed.");
+ throw new AccumuloException("Bulk Import succeeded when it should have failed: " + dir + " table " + tableName);
break;
case ALTER_TABLE:
AlterTable.renameTable(conn, state, tableName, tableName + "plus", hasPerm, tableExists);
Modified: incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/util/AddressUtil.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/util/AddressUtil.java?rev=1236853&r1=1236852&r2=1236853&view=diff
==============================================================================
--- incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/util/AddressUtil.java (original)
+++ incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/util/AddressUtil.java Fri Jan 27 19:35:32 2012
@@ -23,7 +23,7 @@ import org.apache.accumulo.server.conf.S
public class AddressUtil {
static public InetSocketAddress parseAddress(String address, Property portDefaultProperty) {
- final int dfaultPort = ServerConfiguration.getSystemConfiguration().getPort(Property.TSERV_CLIENTPORT);
+ final int dfaultPort = ServerConfiguration.getDefaultConfiguration().getPort(Property.TSERV_CLIENTPORT);
return org.apache.accumulo.core.util.AddressUtil.parseAddress(address, dfaultPort);
}
Added: incubator/accumulo/branches/1.4/src/server/src/test/java/org/apache/accumulo/server/master/TestMergeState.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/src/server/src/test/java/org/apache/accumulo/server/master/TestMergeState.java?rev=1236853&view=auto
==============================================================================
--- incubator/accumulo/branches/1.4/src/server/src/test/java/org/apache/accumulo/server/master/TestMergeState.java (added)
+++ incubator/accumulo/branches/1.4/src/server/src/test/java/org/apache/accumulo/server/master/TestMergeState.java Fri Jan 27 19:35:32 2012
@@ -0,0 +1,198 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.master;
+
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Set;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.BatchDeleter;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.mock.MockInstance;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.thrift.AuthInfo;
+import org.apache.accumulo.core.util.ColumnFQ;
+import org.apache.accumulo.server.master.state.Assignment;
+import org.apache.accumulo.server.master.state.CurrentState;
+import org.apache.accumulo.server.master.state.MergeInfo;
+import org.apache.accumulo.server.master.state.MergeState;
+import org.apache.accumulo.server.master.state.MergeStats;
+import org.apache.accumulo.server.master.state.MetaDataStateStore;
+import org.apache.accumulo.server.master.state.TServerInstance;
+import org.apache.accumulo.server.master.state.TabletLocationState;
+import org.apache.accumulo.server.master.state.TabletState;
+import org.apache.hadoop.io.Text;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ *
+ */
+public class TestMergeState {
+
+ class MockCurrentState implements CurrentState {
+
+ TServerInstance someTServer = new TServerInstance(new InetSocketAddress("127.0.0.1", 1234), 0x123456);
+ MergeInfo mergeInfo;
+
+ MockCurrentState(MergeInfo info) {
+ this.mergeInfo = info;
+ }
+
+ @Override
+ public Set<String> onlineTables() {
+ return Collections.singleton("t");
+ }
+
+ @Override
+ public Set<TServerInstance> onlineTabletServers() {
+ return Collections.singleton(someTServer);
+ }
+
+ @Override
+ public Collection<MergeInfo> merges() {
+ return Collections.singleton(mergeInfo);
+ }
+ }
+
+ private static void update(Connector c, Mutation m) throws TableNotFoundException, MutationsRejectedException {
+ BatchWriter bw = c.createBatchWriter(Constants.METADATA_TABLE_NAME, 1000l, 1000l, 1);
+ bw.addMutation(m);
+ bw.close();
+ }
+
+ @Test
+ public void test() throws Exception {
+ Instance instance = new MockInstance();
+ Connector connector = instance.getConnector("root", "secret");
+ BatchWriter bw = connector.createBatchWriter("!METADATA", 1000l, 1000l, 1);
+
+ // Create a fake METADATA table with these splits
+ String splits[] = {"a", "e", "j", "o", "t", "z"};
+ // create metadata for a table "t" with the splits above
+ Text tableId = new Text("t");
+ Text pr = null;
+ for (String s : splits) {
+ Text split = new Text(s);
+ Mutation prevRow = KeyExtent.getPrevRowUpdateMutation(new KeyExtent(tableId, split, pr));
+ prevRow.put(Constants.METADATA_CURRENT_LOCATION_COLUMN_FAMILY, new Text("123456"), new Value("127.0.0.1:1234".getBytes()));
+ ColumnFQ.put(prevRow, Constants.METADATA_CHOPPED_COLUMN, new Value("junk".getBytes()));
+ bw.addMutation(prevRow);
+ pr = split;
+ }
+ // Add the default tablet
+ Mutation defaultTablet = KeyExtent.getPrevRowUpdateMutation(new KeyExtent(tableId, null, pr));
+ defaultTablet.put(Constants.METADATA_CURRENT_LOCATION_COLUMN_FAMILY, new Text("123456"), new Value("127.0.0.1:1234".getBytes()));
+ bw.addMutation(defaultTablet);
+ bw.close();
+
+ // Read out the TabletLocationStates
+ MockCurrentState state = new MockCurrentState(new MergeInfo(new KeyExtent(tableId, new Text("p"), new Text("e")), MergeInfo.Operation.MERGE));
+ AuthInfo auths = new AuthInfo("root", ByteBuffer.wrap("secret".getBytes()), "instance");
+
+ // Verify the tablet state: hosted, and count
+ MetaDataStateStore metaDataStateStore = new MetaDataStateStore(instance, auths, state);
+ int count = 0;
+ for (TabletLocationState tss : metaDataStateStore) {
+ Assert.assertEquals(TabletState.HOSTED, tss.getState(state.onlineTabletServers()));
+ count++;
+ }
+ Assert.assertEquals(splits.length + 1, count);
+
+ // Create the hole
+ // Split the tablet at one end of the range
+ Mutation m = new KeyExtent(tableId, new Text("t"), new Text("p")).getPrevRowUpdateMutation();
+ ColumnFQ.put(m, Constants.METADATA_SPLIT_RATIO_COLUMN, new Value("0.5".getBytes()));
+ ColumnFQ.put(m, Constants.METADATA_OLD_PREV_ROW_COLUMN, KeyExtent.encodePrevEndRow(new Text("o")));
+ update(connector, m);
+
+ // do the state check
+ MergeStats stats = scan(state, metaDataStateStore);
+ MergeState newState = stats.nextMergeState();
+ Assert.assertEquals(MergeState.WAITING_FOR_OFFLINE, newState);
+
+ // unassign the tablets
+ BatchDeleter deleter = connector.createBatchDeleter("!METADATA", Constants.NO_AUTHS, 1000, 1000l, 1000l, 1);
+ deleter.fetchColumnFamily(Constants.METADATA_CURRENT_LOCATION_COLUMN_FAMILY);
+ deleter.setRanges(Collections.singletonList(new Range()));
+ deleter.delete();
+
+ // now we should be ready to merge
+ stats = scan(state, metaDataStateStore);
+ Assert.assertEquals(MergeState.MERGING, stats.nextMergeState());
+
+ // but, we have an inconsistent !METADATA table, so double check
+ Assert.assertFalse(stats.verifyMergeConsistency(connector, state));
+
+ // finish the split
+ KeyExtent tablet = new KeyExtent(tableId, new Text("p"), new Text("o"));
+ m = tablet.getPrevRowUpdateMutation();
+ ColumnFQ.put(m, Constants.METADATA_SPLIT_RATIO_COLUMN, new Value("0.5".getBytes()));
+ update(connector, m);
+ metaDataStateStore.setLocations(Collections.singletonList(new Assignment(tablet, state.someTServer)));
+
+ // onos... there's a new tablet online
+ stats = scan(state, metaDataStateStore);
+ Assert.assertEquals(MergeState.WAITING_FOR_CHOPPED, stats.nextMergeState());
+
+ // chop it
+ m = tablet.getPrevRowUpdateMutation();
+ ColumnFQ.put(m, Constants.METADATA_CHOPPED_COLUMN, new Value("junk".getBytes()));
+ update(connector, m);
+
+ stats = scan(state, metaDataStateStore);
+ Assert.assertEquals(MergeState.WAITING_FOR_OFFLINE, stats.nextMergeState());
+
+ // take it offline
+ m = tablet.getPrevRowUpdateMutation();
+ Collection<Collection<String>> walogs = Collections.emptyList();
+ metaDataStateStore.unassign(Collections.singletonList(new TabletLocationState(tablet, null, state.someTServer, null, walogs, false)));
+
+ // now we can split
+ stats = scan(state, metaDataStateStore);
+ Assert.assertEquals(MergeState.MERGING, stats.nextMergeState());
+
+ // and we have consistent !METADATA table
+ Assert.assertTrue(stats.verifyMergeConsistency(connector, state));
+
+ }
+
+ /**
+ * @param state
+ * @param metaDataStateStore
+ * @param locations
+ * @return
+ */
+ private MergeStats scan(MockCurrentState state, MetaDataStateStore metaDataStateStore) {
+ MergeStats stats = new MergeStats(state.mergeInfo);
+ stats.getMergeInfo().setState(MergeState.WAITING_FOR_OFFLINE);
+ for (TabletLocationState tss : metaDataStateStore) {
+ stats.update(tss.extent, tss.getState(state.onlineTabletServers()), tss.chopped);
+ }
+ return stats;
+ }
+}
Propchange: incubator/accumulo/branches/1.4/src/server/src/test/java/org/apache/accumulo/server/master/TestMergeState.java
------------------------------------------------------------------------------
svn:mime-type = text/plain