You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2013/07/23 18:55:10 UTC
[45/50] git commit: ACCUMULO-1000 fixed a lot of odds and ends
ACCUMULO-1000 fixed a lot of odds and ends
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/5e908585
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/5e908585
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/5e908585
Branch: refs/heads/ACCUMULO-1000
Commit: 5e908585aa76b605840c87e2f769e2aff642b3a6
Parents: 7bb5f8f
Author: Keith Turner <kt...@apache.org>
Authored: Mon Jul 22 16:37:03 2013 -0400
Committer: Keith Turner <kt...@apache.org>
Committed: Mon Jul 22 16:37:03 2013 -0400
----------------------------------------------------------------------
.../core/client/impl/ConditionalWriterImpl.java | 43 +++--
.../iterators/system/ColumnQualifierFilter.java | 5 +-
.../accumulo/server/tabletserver/Tablet.java | 8 +-
.../server/tabletserver/TabletServer.java | 173 ++++++++++---------
4 files changed, 123 insertions(+), 106 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/5e908585/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java
index c87c865..ed20054 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java
@@ -65,6 +65,7 @@ import org.apache.accumulo.core.util.BadArgumentException;
import org.apache.accumulo.core.util.ByteBufferUtil;
import org.apache.accumulo.core.util.LoggingRunnable;
import org.apache.accumulo.core.util.ThriftUtil;
+import org.apache.accumulo.core.util.UtilWaitThread;
import org.apache.accumulo.trace.instrument.Tracer;
import org.apache.accumulo.trace.thrift.TInfo;
import org.apache.commons.collections.map.LRUMap;
@@ -81,6 +82,10 @@ class ConditionalWriterImpl implements ConditionalWriter {
private static final Logger log = Logger.getLogger(ConditionalWriterImpl.class);
+ private static final int MAX_SLEEP = 5000;
+
+ private static final long SESSION_CACHE_TIME = 60000;
+
private Authorizations auths;
private VisibilityEvaluator ve;
@SuppressWarnings("unchecked")
@@ -167,7 +172,7 @@ class ConditionalWriterImpl implements ConditionalWriter {
void resetDelay() {
// TODO eventually timeout a mutation
- delay = Math.min(delay * 2, 5000);
+ delay = Math.min(delay * 2, MAX_SLEEP);
resetTime = System.currentTimeMillis();
}
}
@@ -231,7 +236,7 @@ class ConditionalWriterImpl implements ConditionalWriter {
synchronized (serverQueue) {
serverQueue.queue.add(mutations);
- //never execute more that one task per server
+ // never execute more than one task per server
if(!serverQueue.taskQueued){
threadPool.execute(new LoggingRunnable(log, new SendTask(location)));
serverQueue.taskQueued = true;
@@ -357,12 +362,13 @@ class ConditionalWriterImpl implements ConditionalWriter {
@Override
public void run() {
- TabletServerMutations<QCMutation> mutations = dequeue(location);
- if (mutations != null)
- sendToServer(location, mutations);
-
- //TODO if exception is thrown, will not reschedule
- reschedule(this);
+ try {
+ TabletServerMutations<QCMutation> mutations = dequeue(location);
+ if (mutations != null)
+ sendToServer(location, mutations);
+ } finally {
+ reschedule(this);
+ }
}
}
@@ -380,6 +386,7 @@ class ConditionalWriterImpl implements ConditionalWriter {
private static class SessionID {
long sessionID;
boolean reserved;
+ long lastAccessTime;
}
private HashMap<String, SessionID> cachedSessionIDs = new HashMap<String, SessionID>();
@@ -392,8 +399,12 @@ class ConditionalWriterImpl implements ConditionalWriter {
if (sid.reserved)
throw new IllegalStateException();
- sid.reserved = true;
- return sid.sessionID;
+ if (System.currentTimeMillis() - sid.lastAccessTime > SESSION_CACHE_TIME) {
+ cachedSessionIDs.remove(location);
+ } else {
+ sid.reserved = true;
+ return sid.sessionID;
+ }
}
}
@@ -423,6 +434,7 @@ class ConditionalWriterImpl implements ConditionalWriter {
if(!sid.reserved)
throw new IllegalStateException();
sid.reserved = false;
+ sid.lastAccessTime = System.currentTimeMillis();
}
}
@@ -470,9 +482,6 @@ class ConditionalWriterImpl implements ConditionalWriter {
}
}
-
- // TODO maybe have thrift call return bad extents
-
for (KeyExtent ke : extentsToInvalidate) {
locator.invalidateCache(ke);
}
@@ -533,14 +542,14 @@ class ConditionalWriterImpl implements ConditionalWriter {
*/
private void invalidateSession(long sessionId, String location, TabletServerMutations<QCMutation> mutations) throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
- // TODO could assume tserver will invalidate sessions after a given time period
-
ArrayList<QCMutation> mutList = new ArrayList<QCMutation>();
for (List<QCMutation> tml : mutations.getMutations().values()) {
mutList.addAll(tml);
}
+ long sleepTime = 50;
+
while (true) {
Map<String,TabletServerMutations<QCMutation>> binnedMutations = new HashMap<String,TabletLocator.TabletServerMutations<QCMutation>>();
List<QCMutation> failures = new ArrayList<QCMutation>();
@@ -565,7 +574,9 @@ class ConditionalWriterImpl implements ConditionalWriter {
locator.invalidateCache(location);
}
- //TODO sleep
+ UtilWaitThread.sleep(sleepTime);
+ sleepTime = Math.min(2 * sleepTime, MAX_SLEEP);
+
}
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/5e908585/core/src/main/java/org/apache/accumulo/core/iterators/system/ColumnQualifierFilter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/iterators/system/ColumnQualifierFilter.java b/core/src/main/java/org/apache/accumulo/core/iterators/system/ColumnQualifierFilter.java
index 1595f5a..d5ca3b4 100644
--- a/core/src/main/java/org/apache/accumulo/core/iterators/system/ColumnQualifierFilter.java
+++ b/core/src/main/java/org/apache/accumulo/core/iterators/system/ColumnQualifierFilter.java
@@ -19,6 +19,7 @@ package org.apache.accumulo.core.iterators.system;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
+import java.util.Set;
import org.apache.accumulo.core.data.ArrayByteSequence;
import org.apache.accumulo.core.data.ByteSequence;
@@ -36,7 +37,7 @@ public class ColumnQualifierFilter extends Filter {
public ColumnQualifierFilter() {}
- public ColumnQualifierFilter(SortedKeyValueIterator<Key,Value> iterator, HashSet<Column> columns) {
+ public ColumnQualifierFilter(SortedKeyValueIterator<Key,Value> iterator, Set<Column> columns) {
setSource(iterator);
init(columns);
}
@@ -63,7 +64,7 @@ public class ColumnQualifierFilter extends Filter {
return cfset != null && cfset.contains(key.getColumnFamilyData());
}
- public void init(HashSet<Column> columns) {
+ public void init(Set<Column> columns) {
this.columnFamilies = new HashSet<ByteSequence>();
this.columnsQualifiers = new HashMap<ByteSequence,HashSet<ByteSequence>>();
http://git-wip-us.apache.org/repos/asf/accumulo/blob/5e908585/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java b/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java
index 1305be6..035d9b0 100644
--- a/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java
+++ b/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java
@@ -1651,7 +1651,7 @@ public class Tablet {
}
}
- private Batch nextBatch(SortedKeyValueIterator<Key,Value> iter, Range range, int num, HashSet<Column> columns) throws IOException {
+ private Batch nextBatch(SortedKeyValueIterator<Key,Value> iter, Range range, int num, Set<Column> columns) throws IOException {
// log.info("In nextBatch..");
@@ -1739,7 +1739,7 @@ public class Tablet {
public long numBytes;
}
- Scanner createScanner(Range range, int num, HashSet<Column> columns, Authorizations authorizations, List<IterInfo> ssiList,
+ Scanner createScanner(Range range, int num, Set<Column> columns, Authorizations authorizations, List<IterInfo> ssiList,
Map<String,Map<String,String>> ssio, boolean isolated, AtomicBoolean interruptFlag) {
// do a test to see if this range falls within the tablet, if it does not
// then clip will throw an exception
@@ -1873,14 +1873,14 @@ public class Tablet {
// scan options
Authorizations authorizations;
byte[] defaultLabels;
- HashSet<Column> columnSet;
+ Set<Column> columnSet;
List<IterInfo> ssiList;
Map<String,Map<String,String>> ssio;
AtomicBoolean interruptFlag;
int num;
boolean isolated;
- ScanOptions(int num, Authorizations authorizations, byte[] defaultLabels, HashSet<Column> columnSet, List<IterInfo> ssiList,
+ ScanOptions(int num, Authorizations authorizations, byte[] defaultLabels, Set<Column> columnSet, List<IterInfo> ssiList,
Map<String,Map<String,String>> ssio, AtomicBoolean interruptFlag, boolean isolated) {
this.num = num;
this.authorizations = authorizations;
http://git-wip-us.apache.org/repos/asf/accumulo/blob/5e908585/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java b/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
index 013639e..8f33488 100644
--- a/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
+++ b/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
@@ -750,6 +750,12 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
public TCredentials credentials;
public Authorizations auths;
public String tableId;
+ public AtomicBoolean interruptFlag;
+
+ @Override
+ public void cleanup() {
+ interruptFlag.set(true);
+ }
}
private static class UpdateSession extends Session {
@@ -901,6 +907,8 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
WriteTracker writeTracker = new WriteTracker();
+ private RowLocks rowLocks = new RowLocks();
+
ThriftClientHandler() {
super(instance, watcher);
log.debug(ThriftClientHandler.class.getName() + " created");
@@ -1730,16 +1738,11 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
writeTracker.finishWrite(opid);
}
}
-
- private RowLocks rowLocks = new RowLocks();
- private void checkConditions(Map<KeyExtent,List<ServerConditionalMutation>> updates, ArrayList<TCMResult> results, Authorizations authorizations,
- List<String> symbols) {
+ private void checkConditions(Map<KeyExtent,List<ServerConditionalMutation>> updates, ArrayList<TCMResult> results, ConditionalSession cs,
+ List<String> symbols) throws IOException {
Iterator<Entry<KeyExtent,List<ServerConditionalMutation>>> iter = updates.entrySet().iterator();
- // TODO use constant
- HashSet<Column> columns = new HashSet<Column>();
-
CompressedIterators compressedIters = new CompressedIterators(symbols);
while (iter.hasNext()) {
@@ -1752,97 +1755,91 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
iter.remove();
} else {
List<ServerConditionalMutation> okMutations = new ArrayList<ServerConditionalMutation>(entry.getValue().size());
-
- // TODO extract to method
- for (ServerConditionalMutation scm : entry.getValue()) {
- boolean add = true;
- for(TCondition tc : scm.getConditions()){
-
- Range range;
- if (tc.hasTimestamp)
- range = Range.exact(new Text(scm.getRow()), new Text(tc.getCf()), new Text(tc.getCq()), new Text(tc.getCv()), tc.getTs());
- else
- range = Range.exact(new Text(scm.getRow()), new Text(tc.getCf()), new Text(tc.getCq()), new Text(tc.getCv()));
-
- AtomicBoolean interruptFlag = new AtomicBoolean();
-
- IterConfig ic = compressedIters.decompress(tc.iterators);
- //TODO use one iterator per tablet, push checks into tablet?
- Scanner scanner = tablet.createScanner(range, 1, columns, authorizations, ic.ssiList, ic.ssio, false, interruptFlag);
-
- try {
- ScanBatch batch = scanner.read();
-
- Value val = null;
-
- for (KVEntry entry2 : batch.results) {
- val = entry2.getValue();
- break;
- }
-
- if ((val == null ^ tc.getVal() == null) || (val != null && !Arrays.equals(tc.getVal(), val.get()))) {
- results.add(new TCMResult(scm.getID(), TCMStatus.REJECTED));
- add = false;
- break;
- }
-
- } catch (TabletClosedException e) {
- // TODO ignore rest of tablets mutations
- results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED));
- add = false;
- break;
- } catch (IterationInterruptedException iie) {
- // TODO determine why this happened, ignore rest of tablets mutations?
- results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED));
- add = false;
- break;
- } catch (TooManyFilesException tmfe) {
- // TODO handle differently?
- results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED));
- add = false;
- break;
- } catch (IOException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- } finally {
- scanner.close();
- }
- }
-
- if (add)
+ for (ServerConditionalMutation scm : entry.getValue()) {
+ if (checkCondition(results, cs, compressedIters, tablet, scm))
okMutations.add(scm);
}
- // TODO just rebuild map
- entry.getValue().clear();
- entry.getValue().addAll(okMutations);
+ entry.setValue(okMutations);
}
}
}
- private void writeConditionalMutations(Map<KeyExtent,List<ServerConditionalMutation>> updates, ArrayList<TCMResult> results, TCredentials credentials) {
+ boolean checkCondition(ArrayList<TCMResult> results, ConditionalSession cs, CompressedIterators compressedIters,
+ Tablet tablet, ServerConditionalMutation scm) throws IOException {
+ boolean add = true;
+
+ Set<Column> emptyCols = Collections.emptySet();
+
+ for(TCondition tc : scm.getConditions()){
+
+ Range range;
+ if (tc.hasTimestamp)
+ range = Range.exact(new Text(scm.getRow()), new Text(tc.getCf()), new Text(tc.getCq()), new Text(tc.getCv()), tc.getTs());
+ else
+ range = Range.exact(new Text(scm.getRow()), new Text(tc.getCf()), new Text(tc.getCq()), new Text(tc.getCv()));
+
+ IterConfig ic = compressedIters.decompress(tc.iterators);
+
+ //TODO use one iterator per tablet, push checks into tablet?
+ Scanner scanner = tablet.createScanner(range, 1, emptyCols, cs.auths, ic.ssiList, ic.ssio, false, cs.interruptFlag);
+
+ try {
+ ScanBatch batch = scanner.read();
+
+ Value val = null;
+
+ for (KVEntry entry2 : batch.results) {
+ val = entry2.getValue();
+ break;
+ }
+
+ if ((val == null ^ tc.getVal() == null) || (val != null && !Arrays.equals(tc.getVal(), val.get()))) {
+ results.add(new TCMResult(scm.getID(), TCMStatus.REJECTED));
+ add = false;
+ break;
+ }
+
+ } catch (TabletClosedException e) {
+ results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED));
+ add = false;
+ break;
+ } catch (IterationInterruptedException iie) {
+ results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED));
+ add = false;
+ break;
+ } catch (TooManyFilesException tmfe) {
+ results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED));
+ add = false;
+ break;
+ }
+ }
+ return add;
+ }
+
+ private void writeConditionalMutations(Map<KeyExtent,List<ServerConditionalMutation>> updates, ArrayList<TCMResult> results, ConditionalSession sess) {
Set<Entry<KeyExtent,List<ServerConditionalMutation>>> es = updates.entrySet();
Map<CommitSession,List<Mutation>> sendables = new HashMap<CommitSession,List<Mutation>>();
// TODO stats
+ boolean sessionCanceled = sess.interruptFlag.get();
+
for (Entry<KeyExtent,List<ServerConditionalMutation>> entry : es) {
Tablet tablet = onlineTablets.get(entry.getKey());
- if (tablet == null || tablet.isClosed()) {
+ if (tablet == null || tablet.isClosed() || sessionCanceled) {
for (ServerConditionalMutation scm : entry.getValue())
results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED));
} else {
- // TODO write tracker
-
try {
List<Mutation> mutations = (List<Mutation>) (List<? extends Mutation>) entry.getValue();
if (mutations.size() > 0) {
- CommitSession cs = tablet.prepareMutationsForCommit(new TservConstraintEnv(security, credentials), mutations);
+ CommitSession cs = tablet.prepareMutationsForCommit(new TservConstraintEnv(security, sess.credentials), mutations);
if (cs == null) {
for (ServerConditionalMutation scm : entry.getValue())
@@ -1889,8 +1886,8 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
}
- private Map<KeyExtent,List<ServerConditionalMutation>> conditionalUpdate(TCredentials credentials, Authorizations authorizations,
- Map<KeyExtent,List<ServerConditionalMutation>> updates, ArrayList<TCMResult> results, List<String> symbols) {
+ private Map<KeyExtent,List<ServerConditionalMutation>> conditionalUpdate(ConditionalSession cs, Map<KeyExtent,List<ServerConditionalMutation>> updates,
+ ArrayList<TCMResult> results, List<String> symbols) throws IOException {
// sort each list of mutations, this is done to avoid deadlock and doing seeks in order is more efficient and detect duplicate rows.
ConditionalMutationSet.sortConditionalMutations(updates);
@@ -1902,8 +1899,8 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
// get as many locks as possible w/o blocking... defer any rows that are locked
List<RowLock> locks = rowLocks.acquireRowlocks(updates, deferred);
try {
- checkConditions(updates, results, authorizations, symbols);
- writeConditionalMutations(updates, results, credentials);
+ checkConditions(updates, results, cs, symbols);
+ writeConditionalMutations(updates, results, cs);
} finally {
rowLocks.releaseRowLocks(locks);
}
@@ -1926,6 +1923,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
cs.auths = new Authorizations(authorizations);
cs.credentials = credentials;
cs.tableId = tableID;
+ cs.interruptFlag = new AtomicBoolean();
return sessionManager.createSession(cs, false);
}
@@ -1934,34 +1932,36 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
public List<TCMResult> conditionalUpdate(TInfo tinfo, long sessID, Map<TKeyExtent,List<TConditionalMutation>> mutations, List<String> symbols)
throws NoSuchScanIDException, TException {
// TODO sessions, should show up in list scans
- // TODO timeout like scans do
ConditionalSession cs = (ConditionalSession) sessionManager.reserveSession(sessID);
if(cs == null)
throw new NoSuchScanIDException();
-
+ Text tid = new Text(cs.tableId);
+ long opid = writeTracker.startWrite(TabletType.type(new KeyExtent(tid, null, null)));
try{
Map<KeyExtent,List<ServerConditionalMutation>> updates = Translator.translate(mutations, Translator.TKET,
new Translator.ListTranslator<TConditionalMutation,ServerConditionalMutation>(ServerConditionalMutation.TCMT));
-
- Text tid = new Text(cs.tableId);
+
for(KeyExtent ke : updates.keySet())
if(!ke.getTableId().equals(tid))
throw new IllegalArgumentException("Unexpected table id "+tid+" != "+ke.getTableId());
ArrayList<TCMResult> results = new ArrayList<TCMResult>();
- Map<KeyExtent,List<ServerConditionalMutation>> deferred = conditionalUpdate(cs.credentials, cs.auths, updates, results, symbols);
+ Map<KeyExtent,List<ServerConditionalMutation>> deferred = conditionalUpdate(cs, updates, results, symbols);
while (deferred.size() > 0) {
- deferred = conditionalUpdate(cs.credentials, cs.auths, deferred, results, symbols);
+ deferred = conditionalUpdate(cs, deferred, results, symbols);
}
return results;
+ } catch (IOException ioe) {
+ throw new TException(ioe);
}finally{
+ writeTracker.finishWrite(opid);
sessionManager.unreserveSession(sessID);
}
}
@@ -1970,7 +1970,12 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
public void invalidateConditionalUpdate(TInfo tinfo, long sessID) throws TException {
//this method should wait for any running conditional update to complete
//after this method returns a conditional update should not be able to start
- ConditionalSession cs = (ConditionalSession) sessionManager.reserveSession(sessID, true);
+
+ ConditionalSession cs = (ConditionalSession) sessionManager.getSession(sessID);
+ if (cs != null)
+ cs.interruptFlag.set(true);
+
+ cs = (ConditionalSession) sessionManager.reserveSession(sessID, true);
if(cs != null)
sessionManager.removeSession(sessID, true);
}