You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2012/09/21 18:24:08 UTC

svn commit: r1388563 - /accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java

Author: kturner
Date: Fri Sep 21 16:24:08 2012
New Revision: 1388563

URL: http://svn.apache.org/viewvc?rev=1388563&view=rev
Log:
ACCUMULO-706 Made batchwriter timeout trigger when tablet sever is responsive, but never accepts any data.  Also, now only modified socket timeout if user timeout is less.

Modified:
    accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java

Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java?rev=1388563&r1=1388562&r2=1388563&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java (original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java Fri Sep 21 16:24:08 2012
@@ -52,6 +52,7 @@ import org.apache.accumulo.core.client.T
 import org.apache.accumulo.core.client.TableOfflineException;
 import org.apache.accumulo.core.client.TimedOutException;
 import org.apache.accumulo.core.client.impl.TabletLocator.TabletServerMutations;
+import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.constraints.Violations;
 import org.apache.accumulo.core.data.ConstraintViolationSummary;
 import org.apache.accumulo.core.data.KeyExtent;
@@ -170,7 +171,7 @@ public class TabletServerBatchWriter {
       firstErrorTime = null;
     }
 
-    void errorOccured(Exception e) {
+    void wroteNothing() {
       if (firstErrorTime == null) {
         firstErrorTime = activityTime;
       } else if (System.currentTimeMillis() - firstErrorTime > timeOut) {
@@ -178,6 +179,10 @@ public class TabletServerBatchWriter {
       }
     }
     
+    void errorOccured(Exception e) {
+      wroteNothing();
+    }
+
     public long getTimeOut() {
       return timeOut;
     }
@@ -807,7 +812,8 @@ public class TabletServerBatchWriter {
 
       try {
         TabletClientService.Iface client;
-        if (timeoutTracker.getTimeOut() < Long.MAX_VALUE)
+        
+        if (timeoutTracker.getTimeOut() < instance.getConfiguration().getTimeInMillis(Property.GENERAL_RPC_TIMEOUT))
           client = ThriftUtil.getTServerClient(location, instance.getConfiguration(), timeoutTracker.getTimeOut());
         else
           client = ThriftUtil.getTServerClient(location, instance.getConfiguration());
@@ -830,7 +836,6 @@ public class TabletServerBatchWriter {
           } else {
             
             long usid = client.startUpdate(tinfo, credentials);
-            timeoutTracker.madeProgress();
             
             List<TMutation> updates = new ArrayList<TMutation>();
             for (Entry<KeyExtent,List<Mutation>> entry : tabMuts.entrySet()) {
@@ -844,22 +849,23 @@ public class TabletServerBatchWriter {
                 }
                 
                 client.applyUpdates(tinfo, usid, entry.getKey().toThrift(), updates);
-                timeoutTracker.madeProgress();
                 updates.clear();
                 size = 0;
               }
             }
             
             UpdateErrors updateErrors = client.closeUpdate(tinfo, usid);
-            timeoutTracker.madeProgress();
 
             Map<KeyExtent,Long> failures = Translator.translate(updateErrors.failedExtents, Translator.TKET);
             updatedConstraintViolations(Translator.translate(updateErrors.violationSummaries, Translator.TCVST));
             updateAuthorizationFailures(Translator.translate(updateErrors.authorizationFailures, Translator.TKET));
             
+            long totalCommitted = 0;
+
             for (Entry<KeyExtent,Long> entry : failures.entrySet()) {
               KeyExtent failedExtent = entry.getKey();
               int numCommitted = (int) (long) entry.getValue();
+              totalCommitted += numCommitted;
               
               String table = failedExtent.getTableId().toString();
               
@@ -868,6 +874,14 @@ public class TabletServerBatchWriter {
               ArrayList<Mutation> mutations = (ArrayList<Mutation>) tabMuts.get(failedExtent);
               allFailures.addAll(table, mutations.subList(numCommitted, mutations.size()));
             }
+            
+            if (failures.keySet().containsAll(tabMuts.keySet()) && totalCommitted == 0) {
+              // nothing was successfully written
+              timeoutTracker.wroteNothing();
+            } else {
+              // successfully wrote something to tablet server
+              timeoutTracker.madeProgress();
+            }
           }
           return allFailures;
         } finally {