You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2011/11/15 22:51:03 UTC

svn commit: r1202436 - /incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java

Author: kturner
Date: Tue Nov 15 21:51:02 2011
New Revision: 1202436

URL: http://svn.apache.org/viewvc?rev=1202436&view=rev
Log:
ACCUMULO-120 made special case for single mutation in batch writer

Modified:
    incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java

Modified: incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java?rev=1202436&r1=1202435&r2=1202436&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java (original)
+++ incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java Tue Nov 15 21:51:02 2011
@@ -54,7 +54,9 @@ import org.apache.accumulo.core.data.thr
 import org.apache.accumulo.core.master.state.tables.TableState;
 import org.apache.accumulo.core.security.thrift.AuthInfo;
 import org.apache.accumulo.core.security.thrift.ThriftSecurityException;
+import org.apache.accumulo.core.tabletserver.thrift.ConstraintViolationException;
 import org.apache.accumulo.core.tabletserver.thrift.NoSuchScanIDException;
+import org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException;
 import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
 import org.apache.accumulo.core.util.ThriftUtil;
 import org.apache.hadoop.io.Text;
@@ -752,42 +754,54 @@ public class TabletServerBatchWriter {
         try {
           MutationSet allFailures = new MutationSet();
           
-          long usid = client.startUpdate(null, credentials);
-          
-          List<TMutation> updates = new ArrayList<TMutation>();
-          for (Entry<KeyExtent,List<Mutation>> entry : tabMuts.entrySet()) {
-            long size = 0;
-            Iterator<Mutation> iter = entry.getValue().iterator();
-            while (iter.hasNext()) {
-              while (size < MUTATION_BATCH_SIZE && iter.hasNext()) {
-                Mutation mutation = iter.next();
-                updates.add(mutation.toThrift());
-                size += mutation.numBytes();
-              }
-              
-              client.applyUpdates(null, usid, entry.getKey().toThrift(), updates);
-              updates.clear();
-              size = 0;
+          if (tabMuts.size() == 1 && tabMuts.values().iterator().next().size() == 1) {
+            Entry<KeyExtent,List<Mutation>> entry = tabMuts.entrySet().iterator().next();
+            
+            try {
+              client.update(null, credentials, entry.getKey().toThrift(), entry.getValue().get(0).toThrift());
+            } catch (NotServingTabletException e) {
+              allFailures.addAll(entry.getKey().getTableId().toString(), entry.getValue());
+            } catch (ConstraintViolationException e) {
+              updatedConstraintViolations(Translator.translate(e.violationSummaries, Translator.TCVST));
             }
-          }
-          
-          UpdateErrors updateErrors = client.closeUpdate(null, usid);
-          Map<KeyExtent,Long> failures = Translator.translate(updateErrors.failedExtents, Translator.TKET);
-          updatedConstraintViolations(Translator.translate(updateErrors.violationSummaries, Translator.TCVST));
-          updateAuthorizationFailures(Translator.translate(updateErrors.authorizationFailures, Translator.TKET));
-          
-          for (Entry<KeyExtent,Long> entry : failures.entrySet()) {
-            KeyExtent failedExtent = entry.getKey();
-            int numCommitted = (int) (long) entry.getValue();
+          } else {
+            
+            long usid = client.startUpdate(null, credentials);
             
-            String table = failedExtent.getTableId().toString();
+            List<TMutation> updates = new ArrayList<TMutation>();
+            for (Entry<KeyExtent,List<Mutation>> entry : tabMuts.entrySet()) {
+              long size = 0;
+              Iterator<Mutation> iter = entry.getValue().iterator();
+              while (iter.hasNext()) {
+                while (size < MUTATION_BATCH_SIZE && iter.hasNext()) {
+                  Mutation mutation = iter.next();
+                  updates.add(mutation.toThrift());
+                  size += mutation.numBytes();
+                }
+                
+                client.applyUpdates(null, usid, entry.getKey().toThrift(), updates);
+                updates.clear();
+                size = 0;
+              }
+            }
             
-            TabletLocator.getInstance(instance, credentials, new Text(table)).invalidateCache(failedExtent);
+            UpdateErrors updateErrors = client.closeUpdate(null, usid);
+            Map<KeyExtent,Long> failures = Translator.translate(updateErrors.failedExtents, Translator.TKET);
+            updatedConstraintViolations(Translator.translate(updateErrors.violationSummaries, Translator.TCVST));
+            updateAuthorizationFailures(Translator.translate(updateErrors.authorizationFailures, Translator.TKET));
             
-            ArrayList<Mutation> mutations = (ArrayList<Mutation>) tabMuts.get(failedExtent);
-            allFailures.addAll(table, mutations.subList(numCommitted, mutations.size()));
+            for (Entry<KeyExtent,Long> entry : failures.entrySet()) {
+              KeyExtent failedExtent = entry.getKey();
+              int numCommitted = (int) (long) entry.getValue();
+              
+              String table = failedExtent.getTableId().toString();
+              
+              TabletLocator.getInstance(instance, credentials, new Text(table)).invalidateCache(failedExtent);
+              
+              ArrayList<Mutation> mutations = (ArrayList<Mutation>) tabMuts.get(failedExtent);
+              allFailures.addAll(table, mutations.subList(numCommitted, mutations.size()));
+            }
           }
-          
           return allFailures;
         } finally {
           ThriftUtil.returnClient((TServiceClient) client);