You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2012/09/21 22:34:55 UTC
svn commit: r1388669 - in
/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client: ./ impl/
Author: kturner
Date: Fri Sep 21 20:34:54 2012
New Revision: 1388669
URL: http://svn.apache.org/viewvc?rev=1388669&view=rev
Log:
ACCUMULO-705 ACCUMULO-706 Made failed attempts to locate a tablet timeout
Added:
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/TimeoutTabletLocator.java
Modified:
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/TimedOutException.java
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerOptions.java
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderIterator.java
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftScanner.java
Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/TimedOutException.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/TimedOutException.java?rev=1388669&r1=1388668&r2=1388669&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/TimedOutException.java (original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/TimedOutException.java Fri Sep 21 20:34:54 2012
@@ -43,6 +43,11 @@ public class TimedOutException extends R
}
+ public TimedOutException(String msg) {
+ super(msg);
+ this.timedoutServers = Collections.emptySet();
+ }
+
public Set<String> getTimedOutSevers() {
return Collections.unmodifiableSet(timedoutServers);
}
Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerOptions.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerOptions.java?rev=1388669&r1=1388668&r2=1388669&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerOptions.java (original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerOptions.java Fri Sep 21 20:34:54 2012
@@ -199,6 +199,6 @@ public class ScannerOptions implements S
@Override
public long getTimeout(TimeUnit timeunit) {
- return timeOut;
+ return timeunit.convert(timeOut, TimeUnit.MILLISECONDS);
}
}
Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderIterator.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderIterator.java?rev=1388669&r1=1388668&r2=1388669&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderIterator.java (original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderIterator.java Fri Sep 21 20:34:54 2012
@@ -101,6 +101,8 @@ public class TabletServerBatchReaderIter
private Set<String> timedoutServers;
private long timeout;
+ private TabletLocator locator;
+
public interface ResultReceiver {
void receive(List<Entry<Key,Value>> entries);
}
@@ -144,6 +146,8 @@ public class TabletServerBatchReaderIter
this.options = new ScannerOptions(scannerOptions);
resultsQueue = new ArrayBlockingQueue<List<Entry<Key,Value>>>(numThreads);
+ this.locator = new TimeoutTabletLocator(TabletLocator.getInstance(instance, credentials, new Text(table)), timeout);
+
timeoutTrackers = Collections.synchronizedMap(new HashMap<String,TabletServerBatchReaderIterator.TimeoutTracker>());
timedoutServers = Collections.synchronizedSet(new HashSet<String>());
this.timeout = timeout;
@@ -240,7 +244,7 @@ public class TabletServerBatchReaderIter
Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<String,Map<KeyExtent,List<Range>>>();
- binRanges(TabletLocator.getInstance(instance, credentials, new Text(table)), ranges, binnedRanges);
+ binRanges(locator, ranges, binnedRanges);
doLookups(binnedRanges, receiver, columns);
}
@@ -314,11 +318,9 @@ public class TabletServerBatchReaderIter
for (List<Range> ranges : failures.values())
allRanges.addAll(ranges);
- TabletLocator tabletLocator = TabletLocator.getInstance(instance, credentials, new Text(table));
-
// since the first call to binRanges clipped the ranges to within a tablet, we should not get only
// bin to the set of failed tablets
- binRanges(tabletLocator, allRanges, binnedRanges);
+ binRanges(locator, allRanges, binnedRanges);
doLookups(binnedRanges, receiver, columns);
}
@@ -360,8 +362,7 @@ public class TabletServerBatchReaderIter
doLookup(tsLocation, tabletsRanges, tsFailures, unscanned, receiver, columns, credentials, options, authorizations, instance.getConfiguration(),
timeoutTracker);
if (tsFailures.size() > 0) {
- TabletLocator tabletLocator = TabletLocator.getInstance(instance, credentials, new Text(table));
- tabletLocator.invalidateCache(tsFailures.keySet());
+ locator.invalidateCache(tsFailures.keySet());
synchronized (failures) {
failures.putAll(tsFailures);
}
@@ -373,7 +374,7 @@ public class TabletServerBatchReaderIter
failures.putAll(unscanned);
}
- TabletLocator.getInstance(instance, credentials, new Text(table)).invalidateCache(tsLocation);
+ locator.invalidateCache(tsLocation);
log.debug(e.getMessage(), e);
} catch (AccumuloSecurityException e) {
log.debug(e.getMessage(), e);
Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java?rev=1388669&r1=1388668&r2=1388669&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java (original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java Fri Sep 21 20:34:54 2012
@@ -592,18 +592,31 @@ public class TabletServerBatchWriter {
private ExecutorService sendThreadPool;
private Map<String,TabletServerMutations> serversMutations;
private Set<String> queued;
+ private Map<String,TabletLocator> locators;
public MutationWriter(int numSendThreads) {
serversMutations = new HashMap<String,TabletServerMutations>();
queued = new HashSet<String>();
sendThreadPool = Executors.newFixedThreadPool(numSendThreads, new NamingThreadFactory(this.getClass().getName()));
+ locators = new HashMap<String,TabletLocator>();
}
+ private TabletLocator getLocator(String tableId) {
+ TabletLocator ret = locators.get(tableId);
+ if (ret == null) {
+ ret = TabletLocator.getInstance(instance, credentials, new Text(tableId));
+ ret = new TimeoutTabletLocator(ret, timeout);
+ locators.put(tableId, ret);
+ }
+
+ return ret;
+ }
+
private void binMutations(MutationSet mutationsToProcess, Map<String,TabletServerMutations> binnedMutations) {
try {
Set<Entry<String,List<Mutation>>> es = mutationsToProcess.getMutations().entrySet();
for (Entry<String,List<Mutation>> entry : es) {
- TabletLocator locator = TabletLocator.getInstance(instance, credentials, new Text(entry.getKey()));
+ TabletLocator locator = getLocator(entry.getKey());
String table = entry.getKey();
List<Mutation> tableMutations = entry.getValue();
Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftScanner.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftScanner.java?rev=1388669&r1=1388668&r2=1388669&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftScanner.java (original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftScanner.java Fri Sep 21 20:34:54 2012
@@ -243,6 +243,8 @@ public class ThriftScanner {
if ((currentTime - startTime) / 1000.0 > timeOut)
throw new ScanTimedOutException();
+ log.trace(((currentTime - startTime) / 1000.0) + " " + timeOut);
+
Span locateSpan = Trace.start("scan:locateTablet");
try {
loc = TabletLocator.getInstance(instance, credentials, scanState.tableName).locateTablet(scanState.startRow, scanState.skipStartRow, false);
Added: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/TimeoutTabletLocator.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/TimeoutTabletLocator.java?rev=1388669&view=auto
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/TimeoutTabletLocator.java (added)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/TimeoutTabletLocator.java Fri Sep 21 20:34:54 2012
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.client.impl;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.TimedOutException;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.hadoop.io.Text;
+
+/**
+ *
+ */
+public class TimeoutTabletLocator extends TabletLocator {
+
+ private TabletLocator locator;
+ private long timeout;
+ private Long firstFailTime = null;
+
+ private void failed() {
+ if (firstFailTime == null) {
+ firstFailTime = System.currentTimeMillis();
+ } else if (System.currentTimeMillis() - firstFailTime > timeout) {
+ throw new TimedOutException("Failed to obtain metadata");
+ }
+ }
+
+ private void succeeded() {
+ firstFailTime = null;
+ }
+
+ public TimeoutTabletLocator(TabletLocator locator, long timeout) {
+ this.locator = locator;
+ this.timeout = timeout;
+ }
+
+ @Override
+ public TabletLocation locateTablet(Text row, boolean skipRow, boolean retry) throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
+
+ try {
+ TabletLocation ret = locator.locateTablet(row, skipRow, retry);
+
+ if (ret == null)
+ failed();
+ else
+ succeeded();
+
+ return ret;
+ } catch (AccumuloException ae) {
+ failed();
+ throw ae;
+ }
+ }
+
+ @Override
+ public void binMutations(List<Mutation> mutations, Map<String,TabletServerMutations> binnedMutations, List<Mutation> failures) throws AccumuloException,
+ AccumuloSecurityException, TableNotFoundException {
+ try {
+ locator.binMutations(mutations, binnedMutations, failures);
+
+ if (failures.size() == mutations.size())
+ failed();
+ else
+ succeeded();
+
+ } catch (AccumuloException ae) {
+ failed();
+ throw ae;
+ }
+ }
+
+ /**
+ *
+ */
+
+ @Override
+ public List<Range> binRanges(List<Range> ranges, Map<String,Map<KeyExtent,List<Range>>> binnedRanges) throws AccumuloException, AccumuloSecurityException,
+ TableNotFoundException {
+
+ try {
+ List<Range> ret = locator.binRanges(ranges, binnedRanges);
+
+ if (ranges.size() == ret.size())
+ failed();
+ else
+ succeeded();
+
+ return ret;
+ } catch (AccumuloException ae) {
+ failed();
+ throw ae;
+ }
+ }
+
+ @Override
+ public void invalidateCache(KeyExtent failedExtent) {
+ locator.invalidateCache(failedExtent);
+ }
+
+ @Override
+ public void invalidateCache(Collection<KeyExtent> keySet) {
+ locator.invalidateCache(keySet);
+ }
+
+ @Override
+ public void invalidateCache() {
+ locator.invalidateCache();
+ }
+
+ @Override
+ public void invalidateCache(String server) {
+ locator.invalidateCache(server);
+ }
+
+}