You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2011/11/15 22:51:03 UTC
svn commit: r1202436 -
/incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java
Author: kturner
Date: Tue Nov 15 21:51:02 2011
New Revision: 1202436
URL: http://svn.apache.org/viewvc?rev=1202436&view=rev
Log:
ACCUMULO-120 made special case for single mutation in batch writer
Modified:
incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java
Modified: incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java?rev=1202436&r1=1202435&r2=1202436&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java (original)
+++ incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java Tue Nov 15 21:51:02 2011
@@ -54,7 +54,9 @@ import org.apache.accumulo.core.data.thr
import org.apache.accumulo.core.master.state.tables.TableState;
import org.apache.accumulo.core.security.thrift.AuthInfo;
import org.apache.accumulo.core.security.thrift.ThriftSecurityException;
+import org.apache.accumulo.core.tabletserver.thrift.ConstraintViolationException;
import org.apache.accumulo.core.tabletserver.thrift.NoSuchScanIDException;
+import org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException;
import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
import org.apache.accumulo.core.util.ThriftUtil;
import org.apache.hadoop.io.Text;
@@ -752,42 +754,54 @@ public class TabletServerBatchWriter {
try {
MutationSet allFailures = new MutationSet();
- long usid = client.startUpdate(null, credentials);
-
- List<TMutation> updates = new ArrayList<TMutation>();
- for (Entry<KeyExtent,List<Mutation>> entry : tabMuts.entrySet()) {
- long size = 0;
- Iterator<Mutation> iter = entry.getValue().iterator();
- while (iter.hasNext()) {
- while (size < MUTATION_BATCH_SIZE && iter.hasNext()) {
- Mutation mutation = iter.next();
- updates.add(mutation.toThrift());
- size += mutation.numBytes();
- }
-
- client.applyUpdates(null, usid, entry.getKey().toThrift(), updates);
- updates.clear();
- size = 0;
+ if (tabMuts.size() == 1 && tabMuts.values().iterator().next().size() == 1) {
+ Entry<KeyExtent,List<Mutation>> entry = tabMuts.entrySet().iterator().next();
+
+ try {
+ client.update(null, credentials, entry.getKey().toThrift(), entry.getValue().get(0).toThrift());
+ } catch (NotServingTabletException e) {
+ allFailures.addAll(entry.getKey().getTableId().toString(), entry.getValue());
+ } catch (ConstraintViolationException e) {
+ updatedConstraintViolations(Translator.translate(e.violationSummaries, Translator.TCVST));
}
- }
-
- UpdateErrors updateErrors = client.closeUpdate(null, usid);
- Map<KeyExtent,Long> failures = Translator.translate(updateErrors.failedExtents, Translator.TKET);
- updatedConstraintViolations(Translator.translate(updateErrors.violationSummaries, Translator.TCVST));
- updateAuthorizationFailures(Translator.translate(updateErrors.authorizationFailures, Translator.TKET));
-
- for (Entry<KeyExtent,Long> entry : failures.entrySet()) {
- KeyExtent failedExtent = entry.getKey();
- int numCommitted = (int) (long) entry.getValue();
+ } else {
+
+ long usid = client.startUpdate(null, credentials);
- String table = failedExtent.getTableId().toString();
+ List<TMutation> updates = new ArrayList<TMutation>();
+ for (Entry<KeyExtent,List<Mutation>> entry : tabMuts.entrySet()) {
+ long size = 0;
+ Iterator<Mutation> iter = entry.getValue().iterator();
+ while (iter.hasNext()) {
+ while (size < MUTATION_BATCH_SIZE && iter.hasNext()) {
+ Mutation mutation = iter.next();
+ updates.add(mutation.toThrift());
+ size += mutation.numBytes();
+ }
+
+ client.applyUpdates(null, usid, entry.getKey().toThrift(), updates);
+ updates.clear();
+ size = 0;
+ }
+ }
- TabletLocator.getInstance(instance, credentials, new Text(table)).invalidateCache(failedExtent);
+ UpdateErrors updateErrors = client.closeUpdate(null, usid);
+ Map<KeyExtent,Long> failures = Translator.translate(updateErrors.failedExtents, Translator.TKET);
+ updatedConstraintViolations(Translator.translate(updateErrors.violationSummaries, Translator.TCVST));
+ updateAuthorizationFailures(Translator.translate(updateErrors.authorizationFailures, Translator.TKET));
- ArrayList<Mutation> mutations = (ArrayList<Mutation>) tabMuts.get(failedExtent);
- allFailures.addAll(table, mutations.subList(numCommitted, mutations.size()));
+ for (Entry<KeyExtent,Long> entry : failures.entrySet()) {
+ KeyExtent failedExtent = entry.getKey();
+ int numCommitted = (int) (long) entry.getValue();
+
+ String table = failedExtent.getTableId().toString();
+
+ TabletLocator.getInstance(instance, credentials, new Text(table)).invalidateCache(failedExtent);
+
+ ArrayList<Mutation> mutations = (ArrayList<Mutation>) tabMuts.get(failedExtent);
+ allFailures.addAll(table, mutations.subList(numCommitted, mutations.size()));
+ }
}
-
return allFailures;
} finally {
ThriftUtil.returnClient((TServiceClient) client);