You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2012/09/21 22:34:55 UTC

svn commit: r1388669 - in /accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client: ./ impl/

Author: kturner
Date: Fri Sep 21 20:34:54 2012
New Revision: 1388669

URL: http://svn.apache.org/viewvc?rev=1388669&view=rev
Log:
ACCUMULO-705 ACCUMULO-706 Made failed attempts to locate a tablet timeout

Added:
    accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/TimeoutTabletLocator.java
Modified:
    accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/TimedOutException.java
    accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerOptions.java
    accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderIterator.java
    accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java
    accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftScanner.java

Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/TimedOutException.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/TimedOutException.java?rev=1388669&r1=1388668&r2=1388669&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/TimedOutException.java (original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/TimedOutException.java Fri Sep 21 20:34:54 2012
@@ -43,6 +43,11 @@ public class TimedOutException extends R
     
   }
 
+  public TimedOutException(String msg) {
+    super(msg);
+    this.timedoutServers = Collections.emptySet();
+  }
+
   public Set<String> getTimedOutSevers() {
     return Collections.unmodifiableSet(timedoutServers);
   }

Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerOptions.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerOptions.java?rev=1388669&r1=1388668&r2=1388669&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerOptions.java (original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerOptions.java Fri Sep 21 20:34:54 2012
@@ -199,6 +199,6 @@ public class ScannerOptions implements S
   
   @Override
   public long getTimeout(TimeUnit timeunit) {
-    return timeOut;
+    return timeunit.convert(timeOut, TimeUnit.MILLISECONDS);
   }
 }

Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderIterator.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderIterator.java?rev=1388669&r1=1388668&r2=1388669&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderIterator.java (original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderIterator.java Fri Sep 21 20:34:54 2012
@@ -101,6 +101,8 @@ public class TabletServerBatchReaderIter
   private Set<String> timedoutServers;
   private long timeout;
 
+  private TabletLocator locator;
+
   public interface ResultReceiver {
     void receive(List<Entry<Key,Value>> entries);
   }
@@ -144,6 +146,8 @@ public class TabletServerBatchReaderIter
     this.options = new ScannerOptions(scannerOptions);
     resultsQueue = new ArrayBlockingQueue<List<Entry<Key,Value>>>(numThreads);
     
+    this.locator = new TimeoutTabletLocator(TabletLocator.getInstance(instance, credentials, new Text(table)), timeout);
+
     timeoutTrackers = Collections.synchronizedMap(new HashMap<String,TabletServerBatchReaderIterator.TimeoutTracker>());
     timedoutServers = Collections.synchronizedSet(new HashSet<String>());
     this.timeout = timeout;
@@ -240,7 +244,7 @@ public class TabletServerBatchReaderIter
     
     Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<String,Map<KeyExtent,List<Range>>>();
     
-    binRanges(TabletLocator.getInstance(instance, credentials, new Text(table)), ranges, binnedRanges);
+    binRanges(locator, ranges, binnedRanges);
     
     doLookups(binnedRanges, receiver, columns);
   }
@@ -314,11 +318,9 @@ public class TabletServerBatchReaderIter
     for (List<Range> ranges : failures.values())
       allRanges.addAll(ranges);
     
-    TabletLocator tabletLocator = TabletLocator.getInstance(instance, credentials, new Text(table));
-    
     // since the first call to binRanges clipped the ranges to within a tablet, we should not get only
     // bin to the set of failed tablets
-    binRanges(tabletLocator, allRanges, binnedRanges);
+    binRanges(locator, allRanges, binnedRanges);
     
     doLookups(binnedRanges, receiver, columns);
   }
@@ -360,8 +362,7 @@ public class TabletServerBatchReaderIter
         doLookup(tsLocation, tabletsRanges, tsFailures, unscanned, receiver, columns, credentials, options, authorizations, instance.getConfiguration(),
             timeoutTracker);
         if (tsFailures.size() > 0) {
-          TabletLocator tabletLocator = TabletLocator.getInstance(instance, credentials, new Text(table));
-          tabletLocator.invalidateCache(tsFailures.keySet());
+          locator.invalidateCache(tsFailures.keySet());
           synchronized (failures) {
             failures.putAll(tsFailures);
           }
@@ -373,7 +374,7 @@ public class TabletServerBatchReaderIter
           failures.putAll(unscanned);
         }
         
-        TabletLocator.getInstance(instance, credentials, new Text(table)).invalidateCache(tsLocation);
+        locator.invalidateCache(tsLocation);
         log.debug(e.getMessage(), e);
       } catch (AccumuloSecurityException e) {
         log.debug(e.getMessage(), e);

Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java?rev=1388669&r1=1388668&r2=1388669&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java (original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java Fri Sep 21 20:34:54 2012
@@ -592,18 +592,31 @@ public class TabletServerBatchWriter {
     private ExecutorService sendThreadPool;
     private Map<String,TabletServerMutations> serversMutations;
     private Set<String> queued;
+    private Map<String,TabletLocator> locators;
     
     public MutationWriter(int numSendThreads) {
       serversMutations = new HashMap<String,TabletServerMutations>();
       queued = new HashSet<String>();
       sendThreadPool = Executors.newFixedThreadPool(numSendThreads, new NamingThreadFactory(this.getClass().getName()));
+      locators = new HashMap<String,TabletLocator>();
     }
     
+    private TabletLocator getLocator(String tableId) {
+      TabletLocator ret = locators.get(tableId);
+      if (ret == null) {
+        ret = TabletLocator.getInstance(instance, credentials, new Text(tableId));
+        ret = new TimeoutTabletLocator(ret, timeout);
+        locators.put(tableId, ret);
+      }
+      
+      return ret;
+    }
+
     private void binMutations(MutationSet mutationsToProcess, Map<String,TabletServerMutations> binnedMutations) {
       try {
         Set<Entry<String,List<Mutation>>> es = mutationsToProcess.getMutations().entrySet();
         for (Entry<String,List<Mutation>> entry : es) {
-          TabletLocator locator = TabletLocator.getInstance(instance, credentials, new Text(entry.getKey()));
+          TabletLocator locator = getLocator(entry.getKey());
           
           String table = entry.getKey();
           List<Mutation> tableMutations = entry.getValue();

Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftScanner.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftScanner.java?rev=1388669&r1=1388668&r2=1388669&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftScanner.java (original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftScanner.java Fri Sep 21 20:34:54 2012
@@ -243,6 +243,8 @@ public class ThriftScanner {
           if ((currentTime - startTime) / 1000.0 > timeOut)
             throw new ScanTimedOutException();
           
+          log.trace(((currentTime - startTime) / 1000.0) + " " + timeOut);
+
           Span locateSpan = Trace.start("scan:locateTablet");
           try {
             loc = TabletLocator.getInstance(instance, credentials, scanState.tableName).locateTablet(scanState.startRow, scanState.skipStartRow, false);

Added: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/TimeoutTabletLocator.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/TimeoutTabletLocator.java?rev=1388669&view=auto
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/TimeoutTabletLocator.java (added)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/TimeoutTabletLocator.java Fri Sep 21 20:34:54 2012
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.client.impl;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.TimedOutException;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.hadoop.io.Text;
+
+/**
+ * 
+ */
+public class TimeoutTabletLocator extends TabletLocator {
+  
+  private TabletLocator locator;
+  private long timeout;
+  private Long firstFailTime = null;
+  
+  private void failed() {
+    if (firstFailTime == null) {
+      firstFailTime = System.currentTimeMillis();
+    } else if (System.currentTimeMillis() - firstFailTime > timeout) {
+      throw new TimedOutException("Failed to obtain metadata");
+    }
+  }
+  
+  private void succeeded() {
+    firstFailTime = null;
+  }
+  
+  public TimeoutTabletLocator(TabletLocator locator, long timeout) {
+    this.locator = locator;
+    this.timeout = timeout;
+  }
+
+  @Override
+  public TabletLocation locateTablet(Text row, boolean skipRow, boolean retry) throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
+    
+    try {
+      TabletLocation ret = locator.locateTablet(row, skipRow, retry);
+      
+      if (ret == null)
+        failed();
+      else
+        succeeded();
+      
+      return ret;
+    } catch (AccumuloException ae) {
+      failed();
+      throw ae;
+    }
+  }
+  
+  @Override
+  public void binMutations(List<Mutation> mutations, Map<String,TabletServerMutations> binnedMutations, List<Mutation> failures) throws AccumuloException,
+      AccumuloSecurityException, TableNotFoundException {
+    try {
+      locator.binMutations(mutations, binnedMutations, failures);
+
+      if (failures.size() == mutations.size())
+        failed();
+      else
+        succeeded();
+
+    } catch (AccumuloException ae) {
+      failed();
+      throw ae;
+    }
+  }
+  
+  /**
+   * 
+   */
+
+  @Override
+  public List<Range> binRanges(List<Range> ranges, Map<String,Map<KeyExtent,List<Range>>> binnedRanges) throws AccumuloException, AccumuloSecurityException,
+      TableNotFoundException {
+    
+    try {
+      List<Range> ret = locator.binRanges(ranges, binnedRanges);
+      
+      if (ranges.size() == ret.size())
+        failed();
+      else
+        succeeded();
+      
+      return ret;
+    } catch (AccumuloException ae) {
+      failed();
+      throw ae;
+    }
+  }
+  
+  @Override
+  public void invalidateCache(KeyExtent failedExtent) {
+    locator.invalidateCache(failedExtent);
+  }
+  
+  @Override
+  public void invalidateCache(Collection<KeyExtent> keySet) {
+    locator.invalidateCache(keySet);
+  }
+  
+  @Override
+  public void invalidateCache() {
+    locator.invalidateCache();
+  }
+  
+  @Override
+  public void invalidateCache(String server) {
+    locator.invalidateCache(server);
+  }
+  
+}