You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2011/10/14 01:35:18 UTC
svn commit: r1183164 - in /hbase/branches/0.92/src:
main/java/org/apache/hadoop/hbase/catalog/
test/java/org/apache/hadoop/hbase/catalog/
test/java/org/apache/hadoop/hbase/client/
Author: stack
Date: Thu Oct 13 23:35:18 2011
New Revision: 1183164
URL: http://svn.apache.org/viewvc?rev=1183164&view=rev
Log:
HBASE-3446 ProcessServerShutdown fails if META moves, orphaning lots of regions
Added:
hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/catalog/MetaMigrationRemovingHTD.java
hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/catalog/TestMetaReaderEditorNoCluster.java
hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java
Added: hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/catalog/MetaMigrationRemovingHTD.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/catalog/MetaMigrationRemovingHTD.java?rev=1183164&view=auto
==============================================================================
--- hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/catalog/MetaMigrationRemovingHTD.java (added)
+++ hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/catalog/MetaMigrationRemovingHTD.java Thu Oct 13 23:35:18 2011
@@ -0,0 +1,242 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.catalog;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.catalog.MetaReader.Visitor;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.migration.HRegionInfo090x;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Writables;
+
+/**
+ * Tools to help with migration of meta tables so they no longer host
+ * instances of HTableDescriptor.
+ * @deprecated Used migration from 0.90 to 0.92 so will be going away in next
+ * release
+ */
+public class MetaMigrationRemovingHTD {
+ private static final Log LOG = LogFactory.getLog(MetaMigrationRemovingHTD.class);
+
+ /** The metaupdated column qualifier */
+ public static final byte [] META_MIGRATION_QUALIFIER =
+ Bytes.toBytes("metamigrated");
+
+ /**
+ * Update legacy META rows, removing HTD from HRI.
+ * @param masterServices
+ * @return
+ * @throws IOException
+ */
+ public static List<HTableDescriptor> updateMetaWithNewRegionInfo(
+ final MasterServices masterServices)
+ throws IOException {
+ final List<HTableDescriptor> htds = new ArrayList<HTableDescriptor>();
+ Visitor v = new Visitor() {
+ @Override
+ public boolean visit(Result r) throws IOException {
+ if (r == null || r.isEmpty()) return true;
+ HRegionInfo090x hrfm = MetaMigrationRemovingHTD.getHRegionInfoForMigration(r);
+ if (hrfm == null) return true;
+ htds.add(hrfm.getTableDesc());
+ masterServices.getMasterFileSystem()
+ .createTableDescriptor(hrfm.getTableDesc());
+ updateHRI(masterServices.getCatalogTracker(), false, hrfm);
+ return true;
+ }
+ };
+ MetaReader.fullScan(masterServices.getCatalogTracker(), v);
+ MetaMigrationRemovingHTD.updateRootWithMetaMigrationStatus(masterServices.getCatalogTracker(), true);
+ return htds;
+ }
+
+ /**
+ * Update the ROOT with new HRI. (HRI with no HTD)
+ * @param masterServices
+ * @return
+ * @throws IOException
+ */
+ public static List<HTableDescriptor> updateRootWithNewRegionInfo(
+ final MasterServices masterServices)
+ throws IOException {
+ final List<HTableDescriptor> htds = new ArrayList<HTableDescriptor>();
+ Visitor v = new Visitor() {
+ @Override
+ public boolean visit(Result r) throws IOException {
+ if (r == null || r.isEmpty()) return true;
+ HRegionInfo090x hrfm = MetaMigrationRemovingHTD.getHRegionInfoForMigration(r);
+ if (hrfm == null) return true;
+ htds.add(hrfm.getTableDesc());
+ masterServices.getMasterFileSystem().createTableDescriptor(
+ hrfm.getTableDesc());
+ updateHRI(masterServices.getCatalogTracker(), true, hrfm);
+ return true;
+ }
+ };
+ MetaReader.fullScan(masterServices.getCatalogTracker(), v, null, true);
+ return htds;
+ }
+
+ /**
+ * Migrate root and meta to newer version. This updates the META and ROOT
+ * and removes the HTD from HRI.
+ * @param masterServices
+ * @throws IOException
+ */
+ public static void migrateRootAndMeta(final MasterServices masterServices)
+ throws IOException {
+ updateRootWithNewRegionInfo(masterServices);
+ updateMetaWithNewRegionInfo(masterServices);
+ }
+
+ /**
+ * Update the metamigrated flag in -ROOT-.
+ * @param catalogTracker
+ * @param metaUpdated
+ * @throws IOException
+ */
+ public static void updateRootWithMetaMigrationStatus(
+ CatalogTracker catalogTracker, boolean metaUpdated)
+ throws IOException {
+ Put p = new Put(HRegionInfo.ROOT_REGIONINFO.getRegionName());
+ MetaMigrationRemovingHTD.addMetaUpdateStatus(p, metaUpdated);
+ MetaEditor.putToRootTable(catalogTracker, p);
+ LOG.info("Updated -ROOT- row with metaMigrated status = " + metaUpdated);
+ }
+
+ static void updateHRI(final CatalogTracker ct, final boolean rootTable,
+ final HRegionInfo090x hRegionInfo090x)
+ throws IOException {
+ HRegionInfo regionInfo = new HRegionInfo(hRegionInfo090x);
+ Put p = new Put(regionInfo.getRegionName());
+ p.add(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER,
+ Writables.getBytes(regionInfo));
+ if (rootTable) {
+ MetaEditor.putToRootTable(ct, p);
+ } else {
+ MetaEditor.putToMetaTable(ct, p);
+ }
+ LOG.info("Updated region " + regionInfo + " to " +
+ (rootTable? "-ROOT-": ".META."));
+ }
+
+ /**
+ * @deprecated Going away in 0.94; used for migrating to 0.92 only.
+ */
+ public static HRegionInfo090x getHRegionInfoForMigration(
+ Result data) throws IOException {
+ HRegionInfo090x info = null;
+ byte [] bytes =
+ data.getValue(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER);
+ if (bytes == null) return null;
+ try {
+ info = Writables.getHRegionInfoForMigration(bytes);
+ } catch(IOException ioe) {
+ if (ioe.getMessage().equalsIgnoreCase("HTD not found in input buffer")) {
+ return null;
+ } else {
+ throw ioe;
+ }
+ }
+ LOG.info("Current INFO from scan results = " + info);
+ return info;
+ }
+
+ public static List<HRegionInfo090x> fullScanMetaAndPrintHRIM(
+ CatalogTracker catalogTracker)
+ throws IOException {
+ final List<HRegionInfo090x> regions =
+ new ArrayList<HRegionInfo090x>();
+ Visitor v = new Visitor() {
+ @Override
+ public boolean visit(Result r) throws IOException {
+ if (r == null || r.isEmpty()) return true;
+ LOG.info("fullScanMetaAndPrint1.Current Meta Result: " + r);
+ HRegionInfo090x hrim = getHRegionInfoForMigration(r);
+ LOG.info("fullScanMetaAndPrint.HRIM Print= " + hrim);
+ regions.add(hrim);
+ return true;
+ }
+ };
+ MetaReader.fullScan(catalogTracker, v);
+ return regions;
+ }
+
+ static Put addMetaUpdateStatus(final Put p, final boolean metaUpdated) {
+ p.add(HConstants.CATALOG_FAMILY,
+ MetaMigrationRemovingHTD.META_MIGRATION_QUALIFIER,
+ Bytes.toBytes(metaUpdated));
+ return p;
+ }
+
+ /**
+ * @return True if the meta table has been migrated.
+ * @throws IOException
+ */
+ // Public because used in tests
+ public static boolean isMetaHRIUpdated(final MasterServices services)
+ throws IOException {
+ boolean metaUpdated = false;
+ List<Result> results =
+ MetaReader.fullScanOfRoot(services.getCatalogTracker());
+ if (results == null || results.isEmpty()) {
+ LOG.info("metaUpdated = NULL.");
+ return metaUpdated;
+ }
+ // Presume only the one result.
+ Result r = results.get(0);
+ byte [] metaMigrated = r.getValue(HConstants.CATALOG_FAMILY,
+ MetaMigrationRemovingHTD.META_MIGRATION_QUALIFIER);
+ if (metaMigrated != null && metaMigrated.length > 0) {
+ metaUpdated = Bytes.toBoolean(metaMigrated);
+ }
+ LOG.info("Meta updated status = " + metaUpdated);
+ return metaUpdated;
+ }
+
+ /**
+ * @return True if migrated.
+ * @throws IOException
+ */
+ public static boolean updateMetaWithNewHRI(final MasterServices services)
+ throws IOException {
+ if (isMetaHRIUpdated(services)) {
+ LOG.info("ROOT/Meta already up-to date with new HRI.");
+ return true;
+ }
+ LOG.info("Meta has HRI with HTDs. Updating meta now.");
+ try {
+ migrateRootAndMeta(services);
+ LOG.info("ROOT and Meta updated with new HRI.");
+ return true;
+ } catch (IOException e) {
+ throw new RuntimeException("Update ROOT/Meta with new HRI failed." +
+ "Master startup aborted.");
+ }
+ }
+}
\ No newline at end of file
Added: hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/catalog/TestMetaReaderEditorNoCluster.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/catalog/TestMetaReaderEditorNoCluster.java?rev=1183164&view=auto
==============================================================================
--- hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/catalog/TestMetaReaderEditorNoCluster.java (added)
+++ hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/catalog/TestMetaReaderEditorNoCluster.java Thu Oct 13 23:35:18 2011
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.catalog;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.NavigableMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.HConnectionManager;
+import org.apache.hadoop.hbase.client.HConnectionTestingUtility;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.ipc.HRegionInterface;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Writables;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+/**
+ * Test MetaReader/Editor but without spinning up a cluster.
+ * We mock regionserver back and forth (we do spin up a zk cluster).
+ */
+public class TestMetaReaderEditorNoCluster {
+ private static final Log LOG = LogFactory.getLog(TestMetaReaderEditorNoCluster.class);
+ private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+ private static final Abortable ABORTABLE = new Abortable() {
+ boolean aborted = false;
+ @Override
+ public void abort(String why, Throwable e) {
+ LOG.info(why, e);
+ this.aborted = true;
+ throw new RuntimeException(e);
+ }
+ @Override
+ public boolean isAborted() {
+ return this.aborted;
+ }
+ };
+
+ @Before
+ public void before() throws Exception {
+ UTIL.startMiniZKCluster();
+ }
+
+ @After
+ public void after() throws IOException {
+ UTIL.shutdownMiniZKCluster();
+ }
+
+ /**
+ * Test that MetaReader will ride over server throwing
+ * "Server not running" IOEs.
+ * @see https://issues.apache.org/jira/browse/HBASE-3446
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ @Test
+ public void testRideOverServerNotRunning() throws IOException, InterruptedException {
+ // Need a zk watcher.
+ ZooKeeperWatcher zkw = new ZooKeeperWatcher(UTIL.getConfiguration(),
+ this.getClass().getSimpleName(), ABORTABLE, true);
+ // This is a servername we use in a few places below.
+ ServerName sn = new ServerName("example.com", 1234, System.currentTimeMillis());
+
+ HConnection connection = null;
+ CatalogTracker ct = null;
+ try {
+ // Mock an HRegionInterface. Our mock implementation will fail a few
+ // times when we go to open a scanner.
+ final HRegionInterface implementation = Mockito.mock(HRegionInterface.class);
+ // When openScanner called throw IOE 'Server not running' a few times
+ // before we return a scanner id. Whats WEIRD is that these
+ // exceptions do not show in the log because they are caught and only
+ // printed if we FAIL. We eventually succeed after retry so these don't
+ // show. We will know if they happened or not because we will ask
+ // mockito at the end of this test to verify that openscanner was indeed
+ // called the wanted number of times.
+ final long scannerid = 123L;
+ Mockito.when(implementation.openScanner((byte [])Mockito.any(),
+ (Scan)Mockito.any())).
+ thenThrow(new IOException("Server not running (1 of 3)")).
+ thenThrow(new IOException("Server not running (2 of 3)")).
+ thenThrow(new IOException("Server not running (3 of 3)")).
+ thenReturn(scannerid);
+ // Make it so a verifiable answer comes back when next is called. Return
+ // the verifiable answer and then a null so we stop scanning. Our
+ // verifiable answer is something that looks like a row in META with
+ // a server and startcode that is that of the above defined servername.
+ List<KeyValue> kvs = new ArrayList<KeyValue>();
+ final byte [] rowToVerify = Bytes.toBytes("rowToVerify");
+ kvs.add(new KeyValue(rowToVerify,
+ HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER,
+ Writables.getBytes(HRegionInfo.FIRST_META_REGIONINFO)));
+ kvs.add(new KeyValue(rowToVerify,
+ HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER,
+ Bytes.toBytes(sn.getHostAndPort())));
+ kvs.add(new KeyValue(rowToVerify,
+ HConstants.CATALOG_FAMILY, HConstants.STARTCODE_QUALIFIER,
+ Bytes.toBytes(sn.getStartcode())));
+ final Result [] result = new Result [] {new Result(kvs)};
+ Mockito.when(implementation.next(Mockito.anyLong(), Mockito.anyInt())).
+ thenReturn(result).
+ thenReturn(null);
+
+ // Associate a spied-upon HConnection with UTIL.getConfiguration. Need
+ // to shove this in here first so it gets picked up all over; e.g. by
+ // HTable.
+ connection = HConnectionTestingUtility.getSpiedConnection(UTIL.getConfiguration());
+ // Fix the location lookup so it 'works' though no network. First
+ // make an 'any location' object.
+ final HRegionLocation anyLocation =
+ new HRegionLocation(HRegionInfo.FIRST_META_REGIONINFO, sn.getHostname(),
+ sn.getPort());
+ // Return the any location object when locateRegion is called in HTable
+ // constructor and when its called by ServerCallable (it uses getRegionLocation).
+ // The ugly format below comes of 'Important gotcha on spying real objects!' from
+ // http://mockito.googlecode.com/svn/branches/1.6/javadoc/org/mockito/Mockito.html
+ Mockito.doReturn(anyLocation).
+ when(connection).locateRegion((byte[]) Mockito.any(), (byte[]) Mockito.any());
+ Mockito.doReturn(anyLocation).
+ when(connection).getRegionLocation((byte[]) Mockito.any(),
+ (byte[]) Mockito.any(), Mockito.anyBoolean());
+
+ // Now shove our HRI implementation into the spied-upon connection.
+ Mockito.doReturn(implementation).
+ when(connection).getHRegionConnection(Mockito.anyString(), Mockito.anyInt());
+
+ // Now start up the catalogtracker with our doctored Connection.
+ ct = new CatalogTracker(zkw, null, connection, ABORTABLE, 0);
+ ct.start();
+ // Scan meta for user tables and verify we got back expected answer.
+ NavigableMap<HRegionInfo, Result> hris = MetaReader.getServerUserRegions(ct, sn);
+ assertTrue(hris.size() == 1);
+ assertTrue(hris.firstEntry().getKey().equals(HRegionInfo.FIRST_META_REGIONINFO));
+ assertTrue(Bytes.equals(rowToVerify, hris.firstEntry().getValue().getRow()));
+ // Finally verify that openscanner was called four times -- three times
+ // with exception and then on 4th attempt we succeed.
+ Mockito.verify(implementation, Mockito.times(4)).
+ openScanner((byte [])Mockito.any(), (Scan)Mockito.any());
+ } finally {
+ if (ct != null) ct.stop();
+ HConnectionManager.deleteConnection(UTIL.getConfiguration(), true);
+ zkw.close();
+ }
+ }
+}
Added: hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java?rev=1183164&view=auto
==============================================================================
--- hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java (added)
+++ hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java Thu Oct 13 23:35:18 2011
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ZooKeeperConnectionException;
+import org.apache.hadoop.hbase.client.HConnectionManager.HConnectionImplementation;
+import org.apache.hadoop.hbase.client.HConnectionManager.HConnectionKey;
+import org.mockito.Mockito;
+
+/**
+ * {@link HConnection} testing utility.
+ */
+public class HConnectionTestingUtility {
+ /*
+ * Not part of {@link HBaseTestingUtility} because this class is not
+ * in same package as {@link HConnection}. Would have to reveal ugly
+ * {@link HConnectionManager} innards to HBaseTestingUtility to give it access.
+ */
+ /**
+ * Get a Mocked {@link HConnection} that goes with the passed <code>conf</code>
+ * configuration instance. Minimally the mock will return
+ * <code>conf</conf> when {@link HConnection#getConfiguration()} is invoked.
+ * Be sure to shutdown the connection when done by calling
+ * {@link HConnectionManager#deleteConnection(Configuration, boolean)} else it
+ * will stick around; this is probably not what you want.
+ * @param conf configuration
+ * @return HConnection object for <code>conf</code>
+ * @throws ZooKeeperConnectionException
+ */
+ public static HConnection getMockedConnection(final Configuration conf)
+ throws ZooKeeperConnectionException {
+ HConnectionKey connectionKey = new HConnectionKey(conf);
+ synchronized (HConnectionManager.HBASE_INSTANCES) {
+ HConnectionImplementation connection =
+ HConnectionManager.HBASE_INSTANCES.get(connectionKey);
+ if (connection == null) {
+ connection = Mockito.mock(HConnectionImplementation.class);
+ Mockito.when(connection.getConfiguration()).thenReturn(conf);
+ HConnectionManager.HBASE_INSTANCES.put(connectionKey, connection);
+ }
+ return connection;
+ }
+ }
+
+ /**
+ * Get a Mockito spied-upon {@link HConnection} that goes with the passed
+ * <code>conf</code> configuration instance.
+ * Be sure to shutdown the connection when done by calling
+ * {@link HConnectionManager#deleteConnection(Configuration, boolean)} else it
+ * will stick around; this is probably not what you want.
+ * @param conf configuration
+ * @return HConnection object for <code>conf</code>
+ * @throws ZooKeeperConnectionException
+ * @see http://mockito.googlecode.com/svn/branches/1.6/javadoc/org/mockito/Mockito.html#spy(T)
+ */
+ public static HConnection getSpiedConnection(final Configuration conf)
+ throws ZooKeeperConnectionException {
+ HConnectionKey connectionKey = new HConnectionKey(conf);
+ synchronized (HConnectionManager.HBASE_INSTANCES) {
+ HConnectionImplementation connection =
+ HConnectionManager.HBASE_INSTANCES.get(connectionKey);
+ if (connection == null) {
+ connection = Mockito.spy(new HConnectionImplementation(conf));
+ HConnectionManager.HBASE_INSTANCES.put(connectionKey, connection);
+ }
+ return connection;
+ }
+ }
+}