You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2012/09/21 18:24:08 UTC
svn commit: r1388563 -
/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java
Author: kturner
Date: Fri Sep 21 16:24:08 2012
New Revision: 1388563
URL: http://svn.apache.org/viewvc?rev=1388563&view=rev
Log:
ACCUMULO-706 Made batchwriter timeout trigger when tablet sever is responsive, but never accepts any data. Also, now only modified socket timeout if user timeout is less.
Modified:
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java
Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java?rev=1388563&r1=1388562&r2=1388563&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java (original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java Fri Sep 21 16:24:08 2012
@@ -52,6 +52,7 @@ import org.apache.accumulo.core.client.T
import org.apache.accumulo.core.client.TableOfflineException;
import org.apache.accumulo.core.client.TimedOutException;
import org.apache.accumulo.core.client.impl.TabletLocator.TabletServerMutations;
+import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.constraints.Violations;
import org.apache.accumulo.core.data.ConstraintViolationSummary;
import org.apache.accumulo.core.data.KeyExtent;
@@ -170,7 +171,7 @@ public class TabletServerBatchWriter {
firstErrorTime = null;
}
- void errorOccured(Exception e) {
+ void wroteNothing() {
if (firstErrorTime == null) {
firstErrorTime = activityTime;
} else if (System.currentTimeMillis() - firstErrorTime > timeOut) {
@@ -178,6 +179,10 @@ public class TabletServerBatchWriter {
}
}
+ void errorOccured(Exception e) {
+ wroteNothing();
+ }
+
public long getTimeOut() {
return timeOut;
}
@@ -807,7 +812,8 @@ public class TabletServerBatchWriter {
try {
TabletClientService.Iface client;
- if (timeoutTracker.getTimeOut() < Long.MAX_VALUE)
+
+ if (timeoutTracker.getTimeOut() < instance.getConfiguration().getTimeInMillis(Property.GENERAL_RPC_TIMEOUT))
client = ThriftUtil.getTServerClient(location, instance.getConfiguration(), timeoutTracker.getTimeOut());
else
client = ThriftUtil.getTServerClient(location, instance.getConfiguration());
@@ -830,7 +836,6 @@ public class TabletServerBatchWriter {
} else {
long usid = client.startUpdate(tinfo, credentials);
- timeoutTracker.madeProgress();
List<TMutation> updates = new ArrayList<TMutation>();
for (Entry<KeyExtent,List<Mutation>> entry : tabMuts.entrySet()) {
@@ -844,22 +849,23 @@ public class TabletServerBatchWriter {
}
client.applyUpdates(tinfo, usid, entry.getKey().toThrift(), updates);
- timeoutTracker.madeProgress();
updates.clear();
size = 0;
}
}
UpdateErrors updateErrors = client.closeUpdate(tinfo, usid);
- timeoutTracker.madeProgress();
Map<KeyExtent,Long> failures = Translator.translate(updateErrors.failedExtents, Translator.TKET);
updatedConstraintViolations(Translator.translate(updateErrors.violationSummaries, Translator.TCVST));
updateAuthorizationFailures(Translator.translate(updateErrors.authorizationFailures, Translator.TKET));
+ long totalCommitted = 0;
+
for (Entry<KeyExtent,Long> entry : failures.entrySet()) {
KeyExtent failedExtent = entry.getKey();
int numCommitted = (int) (long) entry.getValue();
+ totalCommitted += numCommitted;
String table = failedExtent.getTableId().toString();
@@ -868,6 +874,14 @@ public class TabletServerBatchWriter {
ArrayList<Mutation> mutations = (ArrayList<Mutation>) tabMuts.get(failedExtent);
allFailures.addAll(table, mutations.subList(numCommitted, mutations.size()));
}
+
+ if (failures.keySet().containsAll(tabMuts.keySet()) && totalCommitted == 0) {
+ // nothing was successfully written
+ timeoutTracker.wroteNothing();
+ } else {
+ // successfully wrote something to tablet server
+ timeoutTracker.madeProgress();
+ }
}
return allFailures;
} finally {