You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2011/10/14 01:35:18 UTC

svn commit: r1183164 - in /hbase/branches/0.92/src: main/java/org/apache/hadoop/hbase/catalog/ test/java/org/apache/hadoop/hbase/catalog/ test/java/org/apache/hadoop/hbase/client/

Author: stack
Date: Thu Oct 13 23:35:18 2011
New Revision: 1183164

URL: http://svn.apache.org/viewvc?rev=1183164&view=rev
Log:
HBASE-3446 ProcessServerShutdown fails if META moves, orphaning lots of regions

Added:
    hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/catalog/MetaMigrationRemovingHTD.java
    hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/catalog/TestMetaReaderEditorNoCluster.java
    hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java

Added: hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/catalog/MetaMigrationRemovingHTD.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/catalog/MetaMigrationRemovingHTD.java?rev=1183164&view=auto
==============================================================================
--- hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/catalog/MetaMigrationRemovingHTD.java (added)
+++ hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/catalog/MetaMigrationRemovingHTD.java Thu Oct 13 23:35:18 2011
@@ -0,0 +1,242 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.catalog;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.catalog.MetaReader.Visitor;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.migration.HRegionInfo090x;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Writables;
+
+/**
+ * Tools to help with migration of meta tables so they no longer host
+ * instances of HTableDescriptor.
+ * @deprecated Used migration from 0.90 to 0.92 so will be going away in next
+ * release
+ */
+public class MetaMigrationRemovingHTD {
+  private static final Log LOG = LogFactory.getLog(MetaMigrationRemovingHTD.class);
+
+  /** The metaupdated column qualifier */
+  public static final byte [] META_MIGRATION_QUALIFIER =
+    Bytes.toBytes("metamigrated");
+
+  /**
+   * Update legacy META rows, removing HTD from HRI.
+   * @param masterServices
+   * @return
+   * @throws IOException
+   */
+  public static List<HTableDescriptor> updateMetaWithNewRegionInfo(
+      final MasterServices masterServices)
+  throws IOException {
+    final List<HTableDescriptor> htds = new ArrayList<HTableDescriptor>();
+    Visitor v = new Visitor() {
+      @Override
+      public boolean visit(Result r) throws IOException {
+        if (r ==  null || r.isEmpty()) return true;
+        HRegionInfo090x hrfm = MetaMigrationRemovingHTD.getHRegionInfoForMigration(r);
+        if (hrfm == null) return true;
+        htds.add(hrfm.getTableDesc());
+        masterServices.getMasterFileSystem()
+          .createTableDescriptor(hrfm.getTableDesc());
+        updateHRI(masterServices.getCatalogTracker(), false, hrfm);
+        return true;
+      }
+    };
+    MetaReader.fullScan(masterServices.getCatalogTracker(), v);
+    MetaMigrationRemovingHTD.updateRootWithMetaMigrationStatus(masterServices.getCatalogTracker(), true);
+    return htds;
+  }
+
+  /**
+   * Update the ROOT with new HRI. (HRI with no HTD)
+   * @param masterServices
+   * @return
+   * @throws IOException
+   */
+  public static List<HTableDescriptor> updateRootWithNewRegionInfo(
+      final MasterServices masterServices)
+  throws IOException {
+    final List<HTableDescriptor> htds = new ArrayList<HTableDescriptor>();
+    Visitor v = new Visitor() {
+      @Override
+      public boolean visit(Result r) throws IOException {
+        if (r ==  null || r.isEmpty()) return true;
+        HRegionInfo090x hrfm = MetaMigrationRemovingHTD.getHRegionInfoForMigration(r);
+        if (hrfm == null) return true;
+        htds.add(hrfm.getTableDesc());
+        masterServices.getMasterFileSystem().createTableDescriptor(
+            hrfm.getTableDesc());
+        updateHRI(masterServices.getCatalogTracker(), true, hrfm);
+        return true;
+      }
+    };
+    MetaReader.fullScan(masterServices.getCatalogTracker(), v, null, true);
+    return htds;
+  }
+
+  /**
+   * Migrate root and meta to newer version. This updates the META and ROOT
+   * and removes the HTD from HRI.
+   * @param masterServices
+   * @throws IOException
+   */
+  public static void migrateRootAndMeta(final MasterServices masterServices)
+      throws IOException {
+    updateRootWithNewRegionInfo(masterServices);
+    updateMetaWithNewRegionInfo(masterServices);
+  }
+
+  /**
+   * Update the metamigrated flag in -ROOT-.
+   * @param catalogTracker
+   * @param metaUpdated
+   * @throws IOException
+   */
+  public static void updateRootWithMetaMigrationStatus(
+      CatalogTracker catalogTracker, boolean metaUpdated)
+  throws IOException {
+    Put p = new Put(HRegionInfo.ROOT_REGIONINFO.getRegionName());
+    MetaMigrationRemovingHTD.addMetaUpdateStatus(p, metaUpdated);
+    MetaEditor.putToRootTable(catalogTracker, p);
+    LOG.info("Updated -ROOT- row with metaMigrated status = " + metaUpdated);
+  }
+
+  static void updateHRI(final CatalogTracker ct, final boolean rootTable,
+    final HRegionInfo090x hRegionInfo090x)
+  throws IOException {
+    HRegionInfo regionInfo = new HRegionInfo(hRegionInfo090x);
+    Put p = new Put(regionInfo.getRegionName());
+    p.add(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER,
+      Writables.getBytes(regionInfo));
+    if (rootTable) {
+      MetaEditor.putToRootTable(ct, p);
+    } else {
+      MetaEditor.putToMetaTable(ct, p);
+    }
+    LOG.info("Updated region " + regionInfo + " to " +
+      (rootTable? "-ROOT-": ".META."));
+  }
+
+  /**
+   * @deprecated Going away in 0.94; used for migrating to 0.92 only.
+   */
+  public static HRegionInfo090x getHRegionInfoForMigration(
+      Result data) throws IOException {
+    HRegionInfo090x info = null;
+    byte [] bytes =
+      data.getValue(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER);
+    if (bytes == null) return null;
+    try {
+      info = Writables.getHRegionInfoForMigration(bytes);
+    } catch(IOException ioe) {
+      if (ioe.getMessage().equalsIgnoreCase("HTD not found in input buffer")) {
+         return null;
+      } else {
+        throw ioe;
+      }
+    }
+    LOG.info("Current INFO from scan results = " + info);
+    return info;
+  }
+
+  public static List<HRegionInfo090x> fullScanMetaAndPrintHRIM(
+      CatalogTracker catalogTracker)
+  throws IOException {
+    final List<HRegionInfo090x> regions =
+      new ArrayList<HRegionInfo090x>();
+    Visitor v = new Visitor() {
+      @Override
+      public boolean visit(Result r) throws IOException {
+        if (r ==  null || r.isEmpty()) return true;
+        LOG.info("fullScanMetaAndPrint1.Current Meta Result: " + r);
+        HRegionInfo090x hrim = getHRegionInfoForMigration(r);
+        LOG.info("fullScanMetaAndPrint.HRIM Print= " + hrim);
+        regions.add(hrim);
+        return true;
+      }
+    };
+    MetaReader.fullScan(catalogTracker, v);
+    return regions;
+  }
+
+  static Put addMetaUpdateStatus(final Put p, final boolean metaUpdated) {
+    p.add(HConstants.CATALOG_FAMILY,
+      MetaMigrationRemovingHTD.META_MIGRATION_QUALIFIER,
+      Bytes.toBytes(metaUpdated));
+    return p;
+  }
+
+  /**
+   * @return True if the meta table has been migrated.
+   * @throws IOException
+   */
+  // Public because used in tests
+  public static boolean isMetaHRIUpdated(final MasterServices services)
+      throws IOException {
+    boolean metaUpdated = false;
+    List<Result> results =
+      MetaReader.fullScanOfRoot(services.getCatalogTracker());
+    if (results == null || results.isEmpty()) {
+      LOG.info("metaUpdated = NULL.");
+      return metaUpdated;
+    }
+    // Presume only the one result.
+    Result r = results.get(0);
+    byte [] metaMigrated = r.getValue(HConstants.CATALOG_FAMILY,
+      MetaMigrationRemovingHTD.META_MIGRATION_QUALIFIER);
+    if (metaMigrated != null && metaMigrated.length > 0) {
+      metaUpdated = Bytes.toBoolean(metaMigrated);
+    }
+    LOG.info("Meta updated status = " + metaUpdated);
+    return metaUpdated;
+  }
+
+  /**
+   * @return True if migrated.
+   * @throws IOException
+   */
+  public static boolean updateMetaWithNewHRI(final MasterServices services)
+  throws IOException {
+    if (isMetaHRIUpdated(services)) {
+      LOG.info("ROOT/Meta already up-to date with new HRI.");
+      return true;
+    }
+    LOG.info("Meta has HRI with HTDs. Updating meta now.");
+    try {
+      migrateRootAndMeta(services);
+      LOG.info("ROOT and Meta updated with new HRI.");
+      return true;
+    } catch (IOException e) {
+      throw new RuntimeException("Update ROOT/Meta with new HRI failed." +
+        "Master startup aborted.");
+    }
+  }
+}
\ No newline at end of file

Added: hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/catalog/TestMetaReaderEditorNoCluster.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/catalog/TestMetaReaderEditorNoCluster.java?rev=1183164&view=auto
==============================================================================
--- hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/catalog/TestMetaReaderEditorNoCluster.java (added)
+++ hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/catalog/TestMetaReaderEditorNoCluster.java Thu Oct 13 23:35:18 2011
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.catalog;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.NavigableMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.HConnectionManager;
+import org.apache.hadoop.hbase.client.HConnectionTestingUtility;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.ipc.HRegionInterface;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Writables;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+/**
+ * Test MetaReader/Editor but without spinning up a cluster.
+ * We mock regionserver back and forth (we do spin up a zk cluster).
+ */
+public class TestMetaReaderEditorNoCluster {
+  private static final Log LOG = LogFactory.getLog(TestMetaReaderEditorNoCluster.class);
+  private static final  HBaseTestingUtility UTIL = new HBaseTestingUtility();
+  private static final Abortable ABORTABLE = new Abortable() {
+    boolean aborted = false;
+    @Override
+    public void abort(String why, Throwable e) {
+      LOG.info(why, e);
+      this.aborted = true;
+      throw new RuntimeException(e);
+    }
+    @Override
+    public boolean isAborted()  {
+      return this.aborted;
+    }
+  };
+
+  @Before
+  public void before() throws Exception {
+    UTIL.startMiniZKCluster();
+  }
+
+  @After
+  public void after() throws IOException {
+    UTIL.shutdownMiniZKCluster();
+  }
+
+  /**
+   * Test that MetaReader will ride over server throwing
+   * "Server not running" IOEs.
+   * @see https://issues.apache.org/jira/browse/HBASE-3446
+   * @throws IOException 
+   * @throws InterruptedException 
+   */
+  @Test
+  public void testRideOverServerNotRunning() throws IOException, InterruptedException {
+    // Need a zk watcher.
+    ZooKeeperWatcher zkw = new ZooKeeperWatcher(UTIL.getConfiguration(),
+      this.getClass().getSimpleName(), ABORTABLE, true);
+    // This is a servername we use in a few places below.
+    ServerName sn = new ServerName("example.com", 1234, System.currentTimeMillis());
+
+    HConnection connection = null;
+    CatalogTracker ct = null;
+    try {
+      // Mock an HRegionInterface. Our mock implementation will fail a few
+      // times when we go to open a scanner.
+      final HRegionInterface implementation = Mockito.mock(HRegionInterface.class);
+      // When openScanner called throw IOE 'Server not running' a few times
+      // before we return a scanner id.  Whats WEIRD is that these
+      // exceptions do not show in the log because they are caught and only
+      // printed if we FAIL.  We eventually succeed after retry so these don't
+      // show.  We will know if they happened or not because we will ask
+      // mockito at the end of this test to verify that openscanner was indeed
+      // called the wanted number of times.
+      final long scannerid = 123L;
+      Mockito.when(implementation.openScanner((byte [])Mockito.any(),
+          (Scan)Mockito.any())).
+        thenThrow(new IOException("Server not running (1 of 3)")).
+        thenThrow(new IOException("Server not running (2 of 3)")).
+        thenThrow(new IOException("Server not running (3 of 3)")).
+        thenReturn(scannerid);
+      // Make it so a verifiable answer comes back when next is called.  Return
+      // the verifiable answer and then a null so we stop scanning.  Our
+      // verifiable answer is something that looks like a row in META with
+      // a server and startcode that is that of the above defined servername.
+      List<KeyValue> kvs = new ArrayList<KeyValue>();
+      final byte [] rowToVerify = Bytes.toBytes("rowToVerify");
+      kvs.add(new KeyValue(rowToVerify,
+        HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER,
+        Writables.getBytes(HRegionInfo.FIRST_META_REGIONINFO)));
+      kvs.add(new KeyValue(rowToVerify,
+        HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER,
+        Bytes.toBytes(sn.getHostAndPort())));
+      kvs.add(new KeyValue(rowToVerify,
+        HConstants.CATALOG_FAMILY, HConstants.STARTCODE_QUALIFIER,
+        Bytes.toBytes(sn.getStartcode())));
+      final Result [] result = new Result [] {new Result(kvs)};
+      Mockito.when(implementation.next(Mockito.anyLong(), Mockito.anyInt())).
+        thenReturn(result).
+        thenReturn(null);
+
+      // Associate a spied-upon HConnection with UTIL.getConfiguration.  Need
+      // to shove this in here first so it gets picked up all over; e.g. by
+      // HTable.
+      connection = HConnectionTestingUtility.getSpiedConnection(UTIL.getConfiguration());
+      // Fix the location lookup so it 'works' though no network.  First
+      // make an 'any location' object.
+      final HRegionLocation anyLocation =
+        new HRegionLocation(HRegionInfo.FIRST_META_REGIONINFO, sn.getHostname(),
+          sn.getPort());
+      // Return the any location object when locateRegion is called in HTable
+      // constructor and when its called by ServerCallable (it uses getRegionLocation).
+      // The ugly format below comes of 'Important gotcha on spying real objects!' from
+      // http://mockito.googlecode.com/svn/branches/1.6/javadoc/org/mockito/Mockito.html
+      Mockito.doReturn(anyLocation).
+        when(connection).locateRegion((byte[]) Mockito.any(), (byte[]) Mockito.any());
+      Mockito.doReturn(anyLocation).
+        when(connection).getRegionLocation((byte[]) Mockito.any(),
+          (byte[]) Mockito.any(), Mockito.anyBoolean());
+
+      // Now shove our HRI implementation into the spied-upon connection.
+      Mockito.doReturn(implementation).
+        when(connection).getHRegionConnection(Mockito.anyString(), Mockito.anyInt());
+
+      // Now start up the catalogtracker with our doctored Connection.
+      ct = new CatalogTracker(zkw, null, connection, ABORTABLE, 0);
+      ct.start();
+      // Scan meta for user tables and verify we got back expected answer.
+      NavigableMap<HRegionInfo, Result> hris = MetaReader.getServerUserRegions(ct, sn);
+      assertTrue(hris.size() == 1);
+      assertTrue(hris.firstEntry().getKey().equals(HRegionInfo.FIRST_META_REGIONINFO));
+      assertTrue(Bytes.equals(rowToVerify, hris.firstEntry().getValue().getRow()));
+      // Finally verify that openscanner was called four times -- three times
+      // with exception and then on 4th attempt we succeed.
+      Mockito.verify(implementation, Mockito.times(4)).
+        openScanner((byte [])Mockito.any(), (Scan)Mockito.any());
+    } finally {
+      if (ct != null) ct.stop();
+      HConnectionManager.deleteConnection(UTIL.getConfiguration(), true);
+      zkw.close();
+    }
+  }
+}

Added: hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java?rev=1183164&view=auto
==============================================================================
--- hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java (added)
+++ hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java Thu Oct 13 23:35:18 2011
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ZooKeeperConnectionException;
+import org.apache.hadoop.hbase.client.HConnectionManager.HConnectionImplementation;
+import org.apache.hadoop.hbase.client.HConnectionManager.HConnectionKey;
+import org.mockito.Mockito;
+
+/**
+ * {@link HConnection} testing utility.
+ */
+public class HConnectionTestingUtility {
+  /*
+   * Not part of {@link HBaseTestingUtility} because this class is not
+   * in same package as {@link HConnection}.  Would have to reveal ugly
+   * {@link HConnectionManager} innards to HBaseTestingUtility to give it access.
+   */
+  /**
+   * Get a Mocked {@link HConnection} that goes with the passed <code>conf</code>
+   * configuration instance.  Minimally the mock will return
+   * <code>conf</conf> when {@link HConnection#getConfiguration()} is invoked.
+   * Be sure to shutdown the connection when done by calling
+   * {@link HConnectionManager#deleteConnection(Configuration, boolean)} else it
+   * will stick around; this is probably not what you want.
+   * @param conf configuration
+   * @return HConnection object for <code>conf</code>
+   * @throws ZooKeeperConnectionException
+   */
+  public static HConnection getMockedConnection(final Configuration conf)
+  throws ZooKeeperConnectionException {
+    HConnectionKey connectionKey = new HConnectionKey(conf);
+    synchronized (HConnectionManager.HBASE_INSTANCES) {
+      HConnectionImplementation connection =
+        HConnectionManager.HBASE_INSTANCES.get(connectionKey);
+      if (connection == null) {
+        connection = Mockito.mock(HConnectionImplementation.class);
+        Mockito.when(connection.getConfiguration()).thenReturn(conf);
+        HConnectionManager.HBASE_INSTANCES.put(connectionKey, connection);
+      }
+      return connection;
+    }
+  }
+
+  /**
+   * Get a Mockito spied-upon {@link HConnection} that goes with the passed
+   * <code>conf</code> configuration instance.
+   * Be sure to shutdown the connection when done by calling
+   * {@link HConnectionManager#deleteConnection(Configuration, boolean)} else it
+   * will stick around; this is probably not what you want.
+   * @param conf configuration
+   * @return HConnection object for <code>conf</code>
+   * @throws ZooKeeperConnectionException
+   * @see http://mockito.googlecode.com/svn/branches/1.6/javadoc/org/mockito/Mockito.html#spy(T)
+   */
+  public static HConnection getSpiedConnection(final Configuration conf)
+  throws ZooKeeperConnectionException {
+    HConnectionKey connectionKey = new HConnectionKey(conf);
+    synchronized (HConnectionManager.HBASE_INSTANCES) {
+      HConnectionImplementation connection =
+        HConnectionManager.HBASE_INSTANCES.get(connectionKey);
+      if (connection == null) {
+        connection = Mockito.spy(new HConnectionImplementation(conf));
+        HConnectionManager.HBASE_INSTANCES.put(connectionKey, connection);
+      }
+      return connection;
+    }
+  }
+}