You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2012/12/28 04:36:59 UTC
svn commit: r1426384 - in /accumulo/trunk:
fate/src/main/java/org/apache/accumulo/fate/
fate/src/test/java/org/apache/accumulo/fate/
server/src/main/java/org/apache/accumulo/server/fate/
server/src/main/java/org/apache/accumulo/server/master/
Author: kturner
Date: Fri Dec 28 03:36:59 2012
New Revision: 1426384
URL: http://svn.apache.org/viewvc?rev=1426384&view=rev
Log:
ACCUMULO-785 added code to age off old finished FATE ops. Also fixed Fate Admin util.
Added:
accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/AgeOffStore.java
accumulo/trunk/fate/src/test/java/org/apache/accumulo/fate/AgeOffStoreTest.java
accumulo/trunk/fate/src/test/java/org/apache/accumulo/fate/SimpleStore.java
Modified:
accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/AdminUtil.java
accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/TStore.java
accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/ZooStore.java
accumulo/trunk/server/src/main/java/org/apache/accumulo/server/fate/Admin.java
accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/Master.java
Modified: accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/AdminUtil.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/AdminUtil.java?rev=1426384&r1=1426383&r2=1426384&view=diff
==============================================================================
--- accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/AdminUtil.java (original)
+++ accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/AdminUtil.java Fri Dec 28 03:36:59 2012
@@ -129,8 +129,8 @@ public class AdminUtil<T> {
}
}
- public void prepDelete(ZooStore<T> zs, String path, String txidStr) {
- checkGlobalLock(path);
+ public void prepDelete(ZooStore<T> zs, IZooReaderWriter zk, String path, String txidStr) {
+ checkGlobalLock(zk, path);
long txid = Long.parseLong(txidStr, 16);
zs.reserve(txid);
@@ -138,8 +138,8 @@ public class AdminUtil<T> {
zs.unreserve(txid, 0);
}
- public void prepFail(ZooStore<T> zs, String path, String txidStr) {
- checkGlobalLock(path);
+ public void prepFail(ZooStore<T> zs, IZooReaderWriter zk, String path, String txidStr) {
+ checkGlobalLock(zk, path);
long txid = Long.parseLong(txidStr, 16);
zs.reserve(txid);
@@ -163,9 +163,17 @@ public class AdminUtil<T> {
}
}
- public void checkGlobalLock(String path) {
- if (ZooLock.getLockData(path) != null) {
- System.err.println("ERROR: Master lock is held, not running");
+ public void checkGlobalLock(IZooReaderWriter zk, String path) {
+ try {
+ if (ZooLock.getLockData(zk.getZooKeeper(), path) != null) {
+ System.err.println("ERROR: Master lock is held, not running");
+ System.exit(-1);
+ }
+ } catch (KeeperException e) {
+ System.err.println("ERROR: Could not read master lock, not running " + e.getMessage());
+ System.exit(-1);
+ } catch (InterruptedException e) {
+ System.err.println("ERROR: Could not read master lock, not running" + e.getMessage());
System.exit(-1);
}
}
Added: accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/AgeOffStore.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/AgeOffStore.java?rev=1426384&view=auto
==============================================================================
--- accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/AgeOffStore.java (added)
+++ accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/AgeOffStore.java Fri Dec 28 03:36:59 2012
@@ -0,0 +1,226 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.fate;
+
+import java.io.Serializable;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.log4j.Logger;
+
+/**
+ * This store removes Repos, in the store it wraps, that are in a finished or new state for more than a configurable time period.
+ *
+ * No external time source is used. It starts tracking idle time when its created.
+ *
+ */
+public class AgeOffStore<T> implements TStore<T> {
+
+ public static interface TimeSource {
+ long currentTimeMillis();
+ }
+
+ final private static Logger log = Logger.getLogger(AgeOffStore.class);
+
+ private TStore<T> store;
+ private Map<Long,Long> candidates;
+ private long ageOffTime;
+ private long minTime;
+ private TimeSource timeSource;
+
+ private synchronized void updateMinTime() {
+ minTime = Long.MAX_VALUE;
+
+ for (Long time : candidates.values()) {
+ if (time < minTime)
+ minTime = time;
+ }
+ }
+
+ private synchronized void addCandidate(long txid) {
+ long time = timeSource.currentTimeMillis();
+ candidates.put(txid, time);
+ if (time < minTime)
+ minTime = time;
+ }
+
+ private synchronized void removeCandidate(long txid) {
+ Long time = candidates.remove(txid);
+ if (time != null && time <= minTime)
+ updateMinTime();
+ }
+
+ public void ageOff() {
+ HashSet<Long> oldTxs = new HashSet<Long>();
+
+ synchronized (this) {
+ long time = timeSource.currentTimeMillis();
+ if (minTime < time && time - minTime >= ageOffTime) {
+ for (Entry<Long,Long> entry : candidates.entrySet()) {
+ if (time - entry.getValue() >= ageOffTime) {
+ oldTxs.add(entry.getKey());
+ }
+ }
+
+ candidates.keySet().removeAll(oldTxs);
+ updateMinTime();
+ }
+ }
+
+ for (Long txid : oldTxs) {
+ try {
+ store.reserve(txid);
+ try {
+ switch (store.getStatus(txid)) {
+ case NEW:
+ case FAILED:
+ case SUCCESSFUL:
+ store.delete(txid);
+ log.debug("Aged off FATE tx " + String.format("%016x", txid));
+ break;
+ }
+
+ } finally {
+ store.unreserve(txid, 0);
+ }
+ } catch (Exception e) {
+ log.warn("Failed to age off FATE tx " + String.format("%016x", txid), e);
+ }
+ }
+ }
+
+ public AgeOffStore(TStore<T> store, long ageOffTime, TimeSource timeSource) {
+ this.store = store;
+ this.ageOffTime = ageOffTime;
+ this.timeSource = timeSource;
+ candidates = new HashMap<Long,Long>();
+
+ minTime = Long.MAX_VALUE;
+
+ List<Long> txids = store.list();
+ for (Long txid : txids) {
+ store.reserve(txid);
+ try {
+ switch (store.getStatus(txid)) {
+ case NEW:
+ case FAILED:
+ case SUCCESSFUL:
+ addCandidate(txid);
+ }
+ } finally {
+ store.unreserve(txid, 0);
+ }
+ }
+ }
+
+ public AgeOffStore(TStore<T> store, long ageOffTime) {
+ this(store, ageOffTime, new TimeSource() {
+ @Override
+ public long currentTimeMillis() {
+ return System.currentTimeMillis();
+ }
+ });
+ }
+
+ @Override
+ public long create() {
+ long txid = store.create();
+ addCandidate(txid);
+ return txid;
+ }
+
+ @Override
+ public long reserve() {
+ return store.reserve();
+ }
+
+ @Override
+ public void reserve(long tid) {
+ store.reserve(tid);
+ }
+
+ @Override
+ public void unreserve(long tid, long deferTime) {
+ store.unreserve(tid, deferTime);
+ }
+
+ @Override
+ public Repo<T> top(long tid) {
+ return store.top(tid);
+ }
+
+ @Override
+ public void push(long tid, Repo<T> repo) throws StackOverflowException {
+ store.push(tid, repo);
+ }
+
+ @Override
+ public void pop(long tid) {
+ store.pop(tid);
+ }
+
+ @Override
+ public org.apache.accumulo.fate.TStore.TStatus getStatus(long tid) {
+ return store.getStatus(tid);
+ }
+
+ @Override
+ public void setStatus(long tid, org.apache.accumulo.fate.TStore.TStatus status) {
+ store.setStatus(tid, status);
+
+ switch (status) {
+ case IN_PROGRESS:
+ case FAILED_IN_PROGRESS:
+ removeCandidate(tid);
+ break;
+ case FAILED:
+ case SUCCESSFUL:
+ addCandidate(tid);
+ break;
+ }
+ }
+
+ @Override
+ public org.apache.accumulo.fate.TStore.TStatus waitForStatusChange(long tid, EnumSet<org.apache.accumulo.fate.TStore.TStatus> expected) {
+ return store.waitForStatusChange(tid, expected);
+ }
+
+ @Override
+ public void setProperty(long tid, String prop, Serializable val) {
+ store.setProperty(tid, prop, val);
+ }
+
+ @Override
+ public Serializable getProperty(long tid, String prop) {
+ return store.getProperty(tid, prop);
+ }
+
+ @Override
+ public void delete(long tid) {
+ store.delete(tid);
+ removeCandidate(tid);
+ }
+
+ @Override
+ public List<Long> list() {
+ return store.list();
+ }
+}
Modified: accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/TStore.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/TStore.java?rev=1426384&r1=1426383&r2=1426384&view=diff
==============================================================================
--- accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/TStore.java (original)
+++ accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/TStore.java Fri Dec 28 03:36:59 2012
@@ -18,6 +18,7 @@ package org.apache.accumulo.fate;
import java.io.Serializable;
import java.util.EnumSet;
+import java.util.List;
/**
* Transaction Store: a place to save transactions
@@ -129,4 +130,12 @@ public interface TStore<T> {
*/
public void delete(long tid);
+ /**
+ * list all transaction ids in store
+ *
+ * @return
+ */
+
+ public List<Long> list();
+
}
Modified: accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/ZooStore.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/ZooStore.java?rev=1426384&r1=1426383&r2=1426384&view=diff
==============================================================================
--- accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/ZooStore.java (original)
+++ accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/ZooStore.java Fri Dec 28 03:36:59 2012
@@ -424,6 +424,7 @@ public class ZooStore<T> implements TSto
}
}
+ @Override
public List<Long> list() {
try {
ArrayList<Long> l = new ArrayList<Long>();
Added: accumulo/trunk/fate/src/test/java/org/apache/accumulo/fate/AgeOffStoreTest.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/fate/src/test/java/org/apache/accumulo/fate/AgeOffStoreTest.java?rev=1426384&view=auto
==============================================================================
--- accumulo/trunk/fate/src/test/java/org/apache/accumulo/fate/AgeOffStoreTest.java (added)
+++ accumulo/trunk/fate/src/test/java/org/apache/accumulo/fate/AgeOffStoreTest.java Fri Dec 28 03:36:59 2012
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.fate;
+
+import java.util.Arrays;
+import java.util.HashSet;
+
+import org.apache.accumulo.fate.AgeOffStore.TimeSource;
+import org.apache.accumulo.fate.TStore.TStatus;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ *
+ */
+public class AgeOffStoreTest {
+
+ private static class TestTimeSource implements TimeSource {
+ long time = 0;
+
+ public long currentTimeMillis() {
+ return time;
+ }
+
+ }
+
+ @Test
+ public void testBasic() {
+
+ TestTimeSource tts = new TestTimeSource();
+ SimpleStore<String> sstore = new SimpleStore<String>();
+ AgeOffStore<String> aoStore = new AgeOffStore<String>(sstore, 10, tts);
+
+ aoStore.ageOff();
+
+ Long txid1 = aoStore.create();
+ aoStore.reserve(txid1);
+ aoStore.setStatus(txid1, TStatus.IN_PROGRESS);
+ aoStore.unreserve(txid1, 0);
+
+ aoStore.ageOff();
+
+ Long txid2 = aoStore.create();
+ aoStore.reserve(txid2);
+ aoStore.setStatus(txid2, TStatus.IN_PROGRESS);
+ aoStore.setStatus(txid2, TStatus.FAILED);
+ aoStore.unreserve(txid2, 0);
+
+ tts.time = 6;
+
+ Long txid3 = aoStore.create();
+ aoStore.reserve(txid3);
+ aoStore.setStatus(txid3, TStatus.IN_PROGRESS);
+ aoStore.setStatus(txid3, TStatus.SUCCESSFUL);
+ aoStore.unreserve(txid3, 0);
+
+ Long txid4 = aoStore.create();
+
+ aoStore.ageOff();
+
+ Assert.assertEquals(new HashSet<Long>(Arrays.asList(txid1, txid2, txid3, txid4)), new HashSet<Long>(aoStore.list()));
+ Assert.assertEquals(4, new HashSet<Long>(aoStore.list()).size());
+
+ tts.time = 15;
+
+ aoStore.ageOff();
+
+ Assert.assertEquals(new HashSet<Long>(Arrays.asList(txid1, txid3, txid4)), new HashSet<Long>(aoStore.list()));
+ Assert.assertEquals(3, new HashSet<Long>(aoStore.list()).size());
+
+ tts.time = 30;
+
+ aoStore.ageOff();
+
+ Assert.assertEquals(new HashSet<Long>(Arrays.asList(txid1)), new HashSet<Long>(aoStore.list()));
+ Assert.assertEquals(1, new HashSet<Long>(aoStore.list()).size());
+ }
+
+ @Test
+ public void testNonEmpty() {
+ // test age off when source store starts off non empty
+
+ TestTimeSource tts = new TestTimeSource();
+ SimpleStore<String> sstore = new SimpleStore<String>();
+ Long txid1 = sstore.create();
+ sstore.reserve(txid1);
+ sstore.setStatus(txid1, TStatus.IN_PROGRESS);
+ sstore.unreserve(txid1, 0);
+
+ Long txid2 = sstore.create();
+ sstore.reserve(txid2);
+ sstore.setStatus(txid2, TStatus.IN_PROGRESS);
+ sstore.setStatus(txid2, TStatus.FAILED);
+ sstore.unreserve(txid2, 0);
+
+ Long txid3 = sstore.create();
+ sstore.reserve(txid3);
+ sstore.setStatus(txid3, TStatus.IN_PROGRESS);
+ sstore.setStatus(txid3, TStatus.SUCCESSFUL);
+ sstore.unreserve(txid3, 0);
+
+ Long txid4 = sstore.create();
+
+ AgeOffStore<String> aoStore = new AgeOffStore<String>(sstore, 10, tts);
+
+ Assert.assertEquals(new HashSet<Long>(Arrays.asList(txid1, txid2, txid3, txid4)), new HashSet<Long>(aoStore.list()));
+ Assert.assertEquals(4, new HashSet<Long>(aoStore.list()).size());
+
+ aoStore.ageOff();
+
+ Assert.assertEquals(new HashSet<Long>(Arrays.asList(txid1, txid2, txid3, txid4)), new HashSet<Long>(aoStore.list()));
+ Assert.assertEquals(4, new HashSet<Long>(aoStore.list()).size());
+
+ tts.time = 15;
+
+ aoStore.ageOff();
+
+ Assert.assertEquals(new HashSet<Long>(Arrays.asList(txid1)), new HashSet<Long>(aoStore.list()));
+ Assert.assertEquals(1, new HashSet<Long>(aoStore.list()).size());
+
+ aoStore.reserve(txid1);
+ aoStore.setStatus(txid1, TStatus.FAILED_IN_PROGRESS);
+ aoStore.unreserve(txid1, 0);
+
+ tts.time = 30;
+
+ aoStore.ageOff();
+
+ Assert.assertEquals(new HashSet<Long>(Arrays.asList(txid1)), new HashSet<Long>(aoStore.list()));
+ Assert.assertEquals(1, new HashSet<Long>(aoStore.list()).size());
+
+ aoStore.reserve(txid1);
+ aoStore.setStatus(txid1, TStatus.FAILED);
+ aoStore.unreserve(txid1, 0);
+
+ aoStore.ageOff();
+
+ Assert.assertEquals(new HashSet<Long>(Arrays.asList(txid1)), new HashSet<Long>(aoStore.list()));
+ Assert.assertEquals(1, new HashSet<Long>(aoStore.list()).size());
+
+ tts.time = 42;
+
+ aoStore.ageOff();
+
+ Assert.assertEquals(0, new HashSet<Long>(aoStore.list()).size());
+ }
+}
Added: accumulo/trunk/fate/src/test/java/org/apache/accumulo/fate/SimpleStore.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/fate/src/test/java/org/apache/accumulo/fate/SimpleStore.java?rev=1426384&view=auto
==============================================================================
--- accumulo/trunk/fate/src/test/java/org/apache/accumulo/fate/SimpleStore.java (added)
+++ accumulo/trunk/fate/src/test/java/org/apache/accumulo/fate/SimpleStore.java Fri Dec 28 03:36:59 2012
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.fate;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.lang.NotImplementedException;
+
+/**
+ *
+ */
+public class SimpleStore<T> implements TStore<T> {
+
+ private long nextId = 1;
+ private Map<Long,TStatus> statuses = new HashMap<Long,TStore.TStatus>();
+ private Set<Long> reserved = new HashSet<Long>();
+
+ @Override
+ public long create() {
+ statuses.put(nextId, TStatus.NEW);
+ return nextId++;
+ }
+
+ @Override
+ public long reserve() {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public void reserve(long tid) {
+ if (reserved.contains(tid))
+ throw new IllegalStateException(); // zoo store would wait, but do not expect test to reserve twice... if test change, then change this
+ reserved.add(tid);
+ }
+
+ @Override
+ public void unreserve(long tid, long deferTime) {
+ if (!reserved.remove(tid)) {
+ throw new IllegalStateException();
+ }
+ }
+
+ @Override
+ public Repo<T> top(long tid) {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public void push(long tid, Repo<T> repo) throws StackOverflowException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public void pop(long tid) {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public org.apache.accumulo.fate.TStore.TStatus getStatus(long tid) {
+ if (!reserved.contains(tid))
+ throw new IllegalStateException();
+
+ TStatus status = statuses.get(tid);
+ if (status == null)
+ return TStatus.UNKNOWN;
+ return status;
+ }
+
+ @Override
+ public void setStatus(long tid, org.apache.accumulo.fate.TStore.TStatus status) {
+ if (!reserved.contains(tid))
+ throw new IllegalStateException();
+ if (!statuses.containsKey(tid))
+ throw new IllegalStateException();
+ statuses.put(tid, status);
+ }
+
+ @Override
+ public org.apache.accumulo.fate.TStore.TStatus waitForStatusChange(long tid, EnumSet<org.apache.accumulo.fate.TStore.TStatus> expected) {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public void setProperty(long tid, String prop, Serializable val) {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public Serializable getProperty(long tid, String prop) {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public void delete(long tid) {
+ if (!reserved.contains(tid))
+ throw new IllegalStateException();
+ statuses.remove(tid);
+ }
+
+ @Override
+ public List<Long> list() {
+ return new ArrayList<Long>(statuses.keySet());
+ }
+
+}
Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/fate/Admin.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/fate/Admin.java?rev=1426384&r1=1426383&r2=1426384&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/fate/Admin.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/fate/Admin.java Fri Dec 28 03:36:59 2012
@@ -20,12 +20,12 @@ import java.util.ArrayList;
import java.util.List;
import org.apache.accumulo.core.Constants;
-import org.apache.accumulo.server.cli.ClientOpts;
import org.apache.accumulo.core.client.Instance;
import org.apache.accumulo.core.zookeeper.ZooUtil;
import org.apache.accumulo.fate.AdminUtil;
import org.apache.accumulo.fate.ZooStore;
import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
+import org.apache.accumulo.server.cli.ClientOpts;
import org.apache.accumulo.server.master.Master;
import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
@@ -53,7 +53,7 @@ public class Admin {
}
@Parameters(commandDescription="List the existing FATE transactions")
- static class ListOpts {
+ static class PrintOpts {
}
public static void main(String[] args) throws Exception {
@@ -64,7 +64,7 @@ public class Admin {
jc.addCommand("fail", fail);
DeleteOpts deleteOpts = new DeleteOpts();
jc.addCommand("delete", deleteOpts);
- jc.addCommand("list", new ListOpts());
+ jc.addCommand("print", new PrintOpts());
jc.parse(args);
if (opts.help || jc.getParsedCommand() == null) {
jc.usage();
@@ -80,9 +80,9 @@ public class Admin {
ZooStore<Master> zs = new ZooStore<Master>(path, zk);
if (jc.getParsedCommand().equals("fail")) {
- admin.prepFail(zs, masterPath, args[1]);
+ admin.prepFail(zs, zk, masterPath, args[1]);
} else if (jc.getParsedCommand().equals("delete")) {
- admin.prepDelete(zs, masterPath, args[1]);
+ admin.prepDelete(zs, zk, masterPath, args[1]);
admin.deleteLocks(zs, zk, ZooUtil.getRoot(instance) + Constants.ZTABLE_LOCKS, args[1]);
} else if (jc.getParsedCommand().equals("print")) {
admin.print(zs, zk, ZooUtil.getRoot(instance) + Constants.ZTABLE_LOCKS);
Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/Master.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/Master.java?rev=1426384&r1=1426383&r2=1426384&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/Master.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/Master.java Fri Dec 28 03:36:59 2012
@@ -90,6 +90,7 @@ import org.apache.accumulo.core.util.Cac
import org.apache.accumulo.core.util.Daemon;
import org.apache.accumulo.core.util.UtilWaitThread;
import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.fate.AgeOffStore;
import org.apache.accumulo.fate.Fate;
import org.apache.accumulo.fate.TStore.TStatus;
import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
@@ -2153,8 +2154,18 @@ public class Master implements LiveTServ
// TODO: add shutdown for fate object
try {
- fate = new Fate<Master>(this, new org.apache.accumulo.fate.ZooStore<Master>(ZooUtil.getRoot(instance) + Constants.ZFATE,
- ZooReaderWriter.getRetryingInstance()), 4);
+ final AgeOffStore<Master> store = new AgeOffStore<Master>(new org.apache.accumulo.fate.ZooStore<Master>(ZooUtil.getRoot(instance) + Constants.ZFATE,
+ ZooReaderWriter.getRetryingInstance()), 1000 * 60 * 60 * 8);
+
+ fate = new Fate<Master>(this, store, 4);
+
+ SimpleTimer.getInstance().schedule(new TimerTask() {
+
+ @Override
+ public void run() {
+ store.ageOff();
+ }
+ }, 63000, 63000);
} catch (KeeperException e) {
throw new IOException(e);
} catch (InterruptedException e) {