You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2014/01/06 21:19:42 UTC

git commit: ACCUMULO-2128 Revert "ACCUMULO-1889 found a few more ZooKeeperInstances that are not closed"

Updated Branches:
  refs/heads/1.6.0-SNAPSHOT cb1243a8a -> 016f3bb10


ACCUMULO-2128 Revert "ACCUMULO-1889 found a few more ZooKeeperInstances that are not closed"

This reverts commit 674fa95cacaa9353142071a66006e0ffb65cae94.

Conflicts:
	core/src/main/java/org/apache/accumulo/core/client/mapreduce/AbstractInputFormat.java


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/016f3bb1
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/016f3bb1
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/016f3bb1

Branch: refs/heads/1.6.0-SNAPSHOT
Commit: 016f3bb10c43f6461c5d41025b0e07b50f1638a2
Parents: cb1243a
Author: Keith Turner <kt...@apache.org>
Authored: Mon Jan 6 15:21:30 2014 -0500
Committer: Keith Turner <kt...@apache.org>
Committed: Mon Jan 6 15:21:30 2014 -0500

----------------------------------------------------------------------
 .../client/mapreduce/AbstractInputFormat.java   | 254 +++++++++----------
 .../client/mapreduce/AccumuloOutputFormat.java  |   5 +-
 .../mapreduce/lib/util/InputConfigurator.java   |   6 +-
 3 files changed, 127 insertions(+), 138 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/016f3bb1/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AbstractInputFormat.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AbstractInputFormat.java b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AbstractInputFormat.java
index 03c6a0a..35587d4 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AbstractInputFormat.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AbstractInputFormat.java
@@ -525,148 +525,144 @@ public abstract class AbstractInputFormat<K,V> extends InputFormat<K,V> {
     Level logLevel = getLogLevel(context);
     log.setLevel(logLevel);
     validateOptions(context);
-    
+
     LinkedList<InputSplit> splits = new LinkedList<InputSplit>();
     Map<String,InputTableConfig> tableConfigs = getInputTableConfigs(context);
-    Instance instance = getInstance(context);
-    try {
-      for (Map.Entry<String,InputTableConfig> tableConfigEntry : tableConfigs.entrySet()) {
-        
-        String tableName = tableConfigEntry.getKey();
-        InputTableConfig tableConfig = tableConfigEntry.getValue();
-        
-        boolean mockInstance;
-        String tableId;
-        // resolve table name to id once, and use id from this point forward
-        if (instance instanceof MockInstance) {
-          tableId = "";
-          mockInstance = true;
-        } else {
-          try {
-            tableId = Tables.getTableId(instance, tableName);
-          } catch (TableNotFoundException e) {
-            throw new IOException(e);
-          }
-          mockInstance = false;
-        }
-        
-        Authorizations auths = getScanAuthorizations(context);
-        String principal = getPrincipal(context);
-        AuthenticationToken token = getAuthenticationToken(context);
-        
-        boolean autoAdjust = tableConfig.shouldAutoAdjustRanges();
-        List<Range> ranges = autoAdjust ? Range.mergeOverlapping(tableConfig.getRanges()) : tableConfig.getRanges();
-        if (ranges.isEmpty()) {
-          ranges = new ArrayList<Range>(1);
-          ranges.add(new Range());
-        }
-        
-        // get the metadata information for these ranges
-        Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<String,Map<KeyExtent,List<Range>>>();
-        TabletLocator tl;
+    for (Map.Entry<String,InputTableConfig> tableConfigEntry : tableConfigs.entrySet()) {
+
+      String tableName = tableConfigEntry.getKey();
+      InputTableConfig tableConfig = tableConfigEntry.getValue();
+      
+      Instance instance = getInstance(context);
+      boolean mockInstance;
+      String tableId;
+      // resolve table name to id once, and use id from this point forward
+      if (instance instanceof MockInstance) {
+        tableId = "";
+        mockInstance = true;
+      } else {
         try {
-          if (tableConfig.isOfflineScan()) {
+          tableId = Tables.getTableId(instance, tableName);
+        } catch (TableNotFoundException e) {
+          throw new IOException(e);
+        }
+        mockInstance = false;
+      }
+      
+      Authorizations auths = getScanAuthorizations(context);
+      String principal = getPrincipal(context);
+      AuthenticationToken token = getAuthenticationToken(context);
+
+      boolean autoAdjust = tableConfig.shouldAutoAdjustRanges();
+      List<Range> ranges = autoAdjust ? Range.mergeOverlapping(tableConfig.getRanges()) : tableConfig.getRanges();
+      if (ranges.isEmpty()) {
+        ranges = new ArrayList<Range>(1);
+        ranges.add(new Range());
+      }
+
+      // get the metadata information for these ranges
+      Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<String,Map<KeyExtent,List<Range>>>();
+      TabletLocator tl;
+      try {
+        if (tableConfig.isOfflineScan()) {
+          binnedRanges = binOfflineTable(context, tableId, ranges);
+          while (binnedRanges == null) {
+            // Some tablets were still online, try again
+            UtilWaitThread.sleep(100 + (int) (Math.random() * 100)); // sleep randomly between 100 and 200 ms
             binnedRanges = binOfflineTable(context, tableId, ranges);
-            while (binnedRanges == null) {
-              // Some tablets were still online, try again
-              UtilWaitThread.sleep(100 + (int) (Math.random() * 100)); // sleep randomly between 100 and 200 ms
-              binnedRanges = binOfflineTable(context, tableId, ranges);
-              
+
+          }
+        } else {
+          tl = getTabletLocator(context, tableId);
+          // its possible that the cache could contain complete, but old information about a tables tablets... so clear it
+          tl.invalidateCache();
+          Credentials creds = new Credentials(getPrincipal(context), getAuthenticationToken(context));
+
+          while (!tl.binRanges(creds, ranges, binnedRanges).isEmpty()) {
+            if (!(instance instanceof MockInstance)) {
+              if (!Tables.exists(instance, tableId))
+                throw new TableDeletedException(tableId);
+              if (Tables.getTableState(instance, tableId) == TableState.OFFLINE)
+                throw new TableOfflineException(instance, tableId);
             }
-          } else {
-            tl = getTabletLocator(context, tableId);
-            // its possible that the cache could contain complete, but old information about a tables tablets... so clear it
+            binnedRanges.clear();
+            log.warn("Unable to locate bins for specified ranges. Retrying.");
+            UtilWaitThread.sleep(100 + (int) (Math.random() * 100)); // sleep randomly between 100 and 200 ms
             tl.invalidateCache();
-            Credentials creds = new Credentials(getPrincipal(context), getAuthenticationToken(context));
-            
-            while (!tl.binRanges(creds, ranges, binnedRanges).isEmpty()) {
-              if (!(instance instanceof MockInstance)) {
-                if (!Tables.exists(instance, tableId))
-                  throw new TableDeletedException(tableId);
-                if (Tables.getTableState(instance, tableId) == TableState.OFFLINE)
-                  throw new TableOfflineException(instance, tableId);
-              }
-              binnedRanges.clear();
-              log.warn("Unable to locate bins for specified ranges. Retrying.");
-              UtilWaitThread.sleep(100 + (int) (Math.random() * 100)); // sleep randomly between 100 and 200 ms
-              tl.invalidateCache();
-            }
           }
-        } catch (Exception e) {
-          throw new IOException(e);
         }
-        
-        HashMap<Range,ArrayList<String>> splitsToAdd = null;
-        
-        if (!autoAdjust)
-          splitsToAdd = new HashMap<Range,ArrayList<String>>();
-        
-        HashMap<String,String> hostNameCache = new HashMap<String,String>();
-        for (Map.Entry<String,Map<KeyExtent,List<Range>>> tserverBin : binnedRanges.entrySet()) {
-          String ip = tserverBin.getKey().split(":", 2)[0];
-          String location = hostNameCache.get(ip);
-          if (location == null) {
-            InetAddress inetAddress = InetAddress.getByName(ip);
-            location = inetAddress.getCanonicalHostName();
-            hostNameCache.put(ip, location);
-          }
-          for (Map.Entry<KeyExtent,List<Range>> extentRanges : tserverBin.getValue().entrySet()) {
-            Range ke = extentRanges.getKey().toDataRange();
-            for (Range r : extentRanges.getValue()) {
-              if (autoAdjust) {
-                // divide ranges into smaller ranges, based on the tablets
-                RangeInputSplit split = new RangeInputSplit(tableName, tableId, ke.clip(r), new String[] {location});
-                
-                split.setOffline(tableConfig.isOfflineScan());
-                split.setIsolatedScan(tableConfig.shouldUseIsolatedScanners());
-                split.setUsesLocalIterators(tableConfig.shouldUseLocalIterators());
-                split.setMockInstance(mockInstance);
-                split.setFetchedColumns(tableConfig.getFetchedColumns());
-                split.setPrincipal(principal);
-                split.setToken(token);
-                split.setInstanceName(instance.getInstanceName());
-                split.setZooKeepers(instance.getZooKeepers());
-                split.setAuths(auths);
-                split.setIterators(tableConfig.getIterators());
-                split.setLogLevel(logLevel);
-                
-                splits.add(split);
-              } else {
-                // don't divide ranges
-                ArrayList<String> locations = splitsToAdd.get(r);
-                if (locations == null)
-                  locations = new ArrayList<String>(1);
-                locations.add(location);
-                splitsToAdd.put(r, locations);
-              }
+      } catch (Exception e) {
+        throw new IOException(e);
+      }
+
+      HashMap<Range,ArrayList<String>> splitsToAdd = null;
+
+      if (!autoAdjust)
+        splitsToAdd = new HashMap<Range,ArrayList<String>>();
+
+      HashMap<String,String> hostNameCache = new HashMap<String,String>();
+      for (Map.Entry<String,Map<KeyExtent,List<Range>>> tserverBin : binnedRanges.entrySet()) {
+        String ip = tserverBin.getKey().split(":", 2)[0];
+        String location = hostNameCache.get(ip);
+        if (location == null) {
+          InetAddress inetAddress = InetAddress.getByName(ip);
+          location = inetAddress.getCanonicalHostName();
+          hostNameCache.put(ip, location);
+        }
+        for (Map.Entry<KeyExtent,List<Range>> extentRanges : tserverBin.getValue().entrySet()) {
+          Range ke = extentRanges.getKey().toDataRange();
+          for (Range r : extentRanges.getValue()) {
+            if (autoAdjust) {
+              // divide ranges into smaller ranges, based on the tablets
+              RangeInputSplit split = new RangeInputSplit(tableName, tableId, ke.clip(r), new String[] {location});
+              
+              split.setOffline(tableConfig.isOfflineScan());
+              split.setIsolatedScan(tableConfig.shouldUseIsolatedScanners());
+              split.setUsesLocalIterators(tableConfig.shouldUseLocalIterators());
+              split.setMockInstance(mockInstance);
+              split.setFetchedColumns(tableConfig.getFetchedColumns());
+              split.setPrincipal(principal);
+              split.setToken(token);
+              split.setInstanceName(instance.getInstanceName());
+              split.setZooKeepers(instance.getZooKeepers());
+              split.setAuths(auths);
+              split.setIterators(tableConfig.getIterators());
+              split.setLogLevel(logLevel);
+              
+              splits.add(split);
+            } else {
+              // don't divide ranges
+              ArrayList<String> locations = splitsToAdd.get(r);
+              if (locations == null)
+                locations = new ArrayList<String>(1);
+              locations.add(location);
+              splitsToAdd.put(r, locations);
             }
           }
         }
-        
-        if (!autoAdjust)
-          for (Map.Entry<Range,ArrayList<String>> entry : splitsToAdd.entrySet()) {
-            RangeInputSplit split = new RangeInputSplit(tableName, tableId, entry.getKey(), entry.getValue().toArray(new String[0]));
-            
-            split.setOffline(tableConfig.isOfflineScan());
-            split.setIsolatedScan(tableConfig.shouldUseIsolatedScanners());
-            split.setUsesLocalIterators(tableConfig.shouldUseLocalIterators());
-            split.setMockInstance(mockInstance);
-            split.setFetchedColumns(tableConfig.getFetchedColumns());
-            split.setPrincipal(principal);
-            split.setToken(token);
-            split.setInstanceName(instance.getInstanceName());
-            split.setZooKeepers(instance.getZooKeepers());
-            split.setAuths(auths);
-            split.setIterators(tableConfig.getIterators());
-            split.setLogLevel(logLevel);
-            
-            splits.add(split);
-          }
       }
-      return splits;
-    } finally {
-      instance.close();
+
+      if (!autoAdjust)
+        for (Map.Entry<Range,ArrayList<String>> entry : splitsToAdd.entrySet()) {
+          RangeInputSplit split = new RangeInputSplit(tableName, tableId, entry.getKey(), entry.getValue().toArray(new String[0]));
+
+          split.setOffline(tableConfig.isOfflineScan());
+          split.setIsolatedScan(tableConfig.shouldUseIsolatedScanners());
+          split.setUsesLocalIterators(tableConfig.shouldUseLocalIterators());
+          split.setMockInstance(mockInstance);
+          split.setFetchedColumns(tableConfig.getFetchedColumns());
+          split.setPrincipal(principal);
+          split.setToken(token);
+          split.setInstanceName(instance.getInstanceName());
+          split.setZooKeepers(instance.getZooKeepers());
+          split.setAuths(auths);
+          split.setIterators(tableConfig.getIterators());
+          split.setLogLevel(logLevel);
+          
+          splits.add(split);
+        }
     }
+    return splits;
   }
 
   // use reflection to pull the Configuration out of the JobContext for Hadoop 1 and Hadoop 2 compatibility

http://git-wip-us.apache.org/repos/asf/accumulo/blob/016f3bb1/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java
index b816d43..0c924b1 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java
@@ -533,20 +533,17 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> {
   public void checkOutputSpecs(JobContext job) throws IOException {
     if (!isConnectorInfoSet(job))
       throw new IOException("Connector info has not been set.");
-    Instance instance = getInstance(job);
     try {
       // if the instance isn't configured, it will complain here
       String principal = getPrincipal(job);
       AuthenticationToken token = getAuthenticationToken(job);
-      Connector c = instance.getConnector(principal, token);
+      Connector c = getInstance(job).getConnector(principal, token);
       if (!c.securityOperations().authenticateUser(principal, token))
         throw new IOException("Unable to authenticate user");
     } catch (AccumuloException e) {
       throw new IOException(e);
     } catch (AccumuloSecurityException e) {
       throw new IOException(e);
-    } finally {
-      instance.close();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/016f3bb1/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/util/InputConfigurator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/util/InputConfigurator.java b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/util/InputConfigurator.java
index 9454d59..7b17d11 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/util/InputConfigurator.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/util/InputConfigurator.java
@@ -625,12 +625,10 @@ public class InputConfigurator extends ConfiguratorBase {
     if (!"MockInstance".equals(instanceKey) && !"ZooKeeperInstance".equals(instanceKey))
       throw new IOException("Instance info has not been set.");
     // validate that we can connect as configured
-    Instance inst = getInstance(implementingClass, conf);
     try {
       String principal = getPrincipal(implementingClass, conf);
       AuthenticationToken token = getAuthenticationToken(implementingClass, conf);
-
-      Connector c = inst.getConnector(principal, token);
+      Connector c = getInstance(implementingClass, conf).getConnector(principal, token);
       if (!c.securityOperations().authenticateUser(principal, token))
         throw new IOException("Unable to authenticate user");
 
@@ -658,8 +656,6 @@ public class InputConfigurator extends ConfiguratorBase {
       throw new IOException(e);
     } catch (TableNotFoundException e) {
       throw new IOException(e);
-    } finally {
-      inst.close();
     }
   }