You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ja...@apache.org on 2015/12/30 14:17:33 UTC

sqoop git commit: SQOOP-2542: Sqoop2: Provide test infrastructure base class for connector tests

Repository: sqoop
Updated Branches:
  refs/heads/sqoop2 a03adec29 -> 8d63df714


SQOOP-2542: Sqoop2: Provide test infrastructure base class for connector tests

(Dian Fu via Jarek Jarcec Cecho)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/8d63df71
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/8d63df71
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/8d63df71

Branch: refs/heads/sqoop2
Commit: 8d63df7145c69baac3d9fd87ecad88ca5b2f2132
Parents: a03adec
Author: Jarek Jarcec Cecho <ja...@apache.org>
Authored: Wed Dec 30 05:16:40 2015 -0800
Committer: Jarek Jarcec Cecho <ja...@apache.org>
Committed: Wed Dec 30 05:16:40 2015 -0800

----------------------------------------------------------------------
 .../sqoop/common/test/kafka/TestUtil.java       |   2 +-
 .../sqoop/test/hive/MetastoreServerRunner.java  |  28 +++-
 .../test/infrastructure/SqoopTestCase.java      | 165 +++++++++++++++++--
 .../providers/HiveInfrastructureProvider.java   | 153 +++++++++++++++++
 .../providers/KafkaInfrastructureProvider.java  |  78 +++++++++
 .../test/testcases/HiveConnectorTestCase.java   |  94 -----------
 .../test/testcases/KafkaConnectorTestCase.java  |  87 ----------
 .../connector/hdfs/AppendModeTest.java          |  13 +-
 .../connector/hdfs/FromHDFSToHDFSTest.java      |   9 +-
 .../connector/hdfs/HdfsIncrementalReadTest.java |  10 +-
 .../hdfs/InformalJobNameExecuteTest.java        |  10 +-
 .../connector/hdfs/OutputDirectoryTest.java     |  14 +-
 .../integration/connector/hdfs/S3Test.java      |  10 +-
 .../connector/hive/FromRDBMSToKiteHiveTest.java |  20 ++-
 .../connector/jdbc/generic/AllTypesTest.java    |  11 +-
 .../jdbc/generic/FromHDFSToRDBMSTest.java       |  16 +-
 .../jdbc/generic/FromRDBMSToHDFSTest.java       |  10 +-
 .../jdbc/generic/IncrementalReadTest.java       |  15 +-
 .../connector/jdbc/generic/PartitionerTest.java |  13 +-
 .../jdbc/generic/TableStagedRDBMSTest.java      |  14 +-
 .../connector/kafka/FromHDFSToKafkaTest.java    |  19 ++-
 .../connector/kafka/FromRDBMSToKafkaTest.java   |  20 ++-
 .../connector/kite/FromRDBMSToKiteTest.java     |  15 +-
 23 files changed, 551 insertions(+), 275 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/8d63df71/common-test/src/main/java/org/apache/sqoop/common/test/kafka/TestUtil.java
----------------------------------------------------------------------
diff --git a/common-test/src/main/java/org/apache/sqoop/common/test/kafka/TestUtil.java b/common-test/src/main/java/org/apache/sqoop/common/test/kafka/TestUtil.java
index 54e2ef4..8ca298d 100644
--- a/common-test/src/main/java/org/apache/sqoop/common/test/kafka/TestUtil.java
+++ b/common-test/src/main/java/org/apache/sqoop/common/test/kafka/TestUtil.java
@@ -67,6 +67,7 @@ public class TestUtil {
   }
 
   public void initTopicList(List<String> topics) {
+    kafkaConsumer = new KafkaConsumer();
     getKafkaConsumer().initTopicList(topics);
   }
 
@@ -85,7 +86,6 @@ public class TestUtil {
     } catch (InterruptedException e) {
       // ignore
     }
-    getKafkaConsumer();
     logger.info("Completed the prepare phase.");
   }
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/8d63df71/test/src/main/java/org/apache/sqoop/test/hive/MetastoreServerRunner.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/sqoop/test/hive/MetastoreServerRunner.java b/test/src/main/java/org/apache/sqoop/test/hive/MetastoreServerRunner.java
index 8a822b5..3aa0513 100644
--- a/test/src/main/java/org/apache/sqoop/test/hive/MetastoreServerRunner.java
+++ b/test/src/main/java/org/apache/sqoop/test/hive/MetastoreServerRunner.java
@@ -23,7 +23,6 @@ import org.apache.log4j.Logger;
 import org.apache.sqoop.common.test.utils.NetworkUtils;
 
 import java.io.ByteArrayOutputStream;
-import java.io.FileNotFoundException;
 import java.io.PrintStream;
 import java.io.UnsupportedEncodingException;
 import java.util.UUID;
@@ -41,6 +40,11 @@ public abstract class MetastoreServerRunner {
   private final int port;
   private final String warehouseDirectory;
 
+  /**
+   * Temporary path that can be used as a root for other directories of metastore.
+   */
+  private String temporaryPath;
+
   public MetastoreServerRunner(String hostname, int port) throws Exception {
     this.hostname = hostname;
     this.warehouseDirectory = "/user/hive/" + UUID.randomUUID();
@@ -131,12 +135,30 @@ public abstract class MetastoreServerRunner {
     return this.port;
   }
 
+  /**
+   * Get temporary path.
+   *
+   * @return
+   */
+  public String getTemporaryPath() {
+    return temporaryPath;
+  }
+
+  /**
+   * Set temporary path.
+   *
+   * @param temporaryPath
+   */
+  public void setTemporaryPath(String temporaryPath) {
+    this.temporaryPath = temporaryPath;
+  }
+
   private void printConfig() {
     ByteArrayOutputStream baos = new ByteArrayOutputStream();
     try {
-      config.logVars(new PrintStream(baos.toString("UTF-8"), "UTF-8"));
+      config.logVars(new PrintStream(baos, false, "UTF-8"));
       LOG.debug("Hive server runner configuration:\n" + baos.toString("UTF-8"));
-    } catch (UnsupportedEncodingException |FileNotFoundException e) {
+    } catch (UnsupportedEncodingException e) {
       LOG.warn("Error to print the Hive server runner configuration.", e);
     }
   }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/8d63df71/test/src/main/java/org/apache/sqoop/test/infrastructure/SqoopTestCase.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/sqoop/test/infrastructure/SqoopTestCase.java b/test/src/main/java/org/apache/sqoop/test/infrastructure/SqoopTestCase.java
index 2c6a537..71cdc22 100644
--- a/test/src/main/java/org/apache/sqoop/test/infrastructure/SqoopTestCase.java
+++ b/test/src/main/java/org/apache/sqoop/test/infrastructure/SqoopTestCase.java
@@ -20,19 +20,23 @@ package org.apache.sqoop.test.infrastructure;
 import org.apache.commons.lang.StringUtils;
 import org.apache.log4j.Logger;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticatedURL;
 import org.apache.sqoop.client.SqoopClient;
 import org.apache.sqoop.client.SubmissionCallback;
+import org.apache.sqoop.common.test.asserts.ProviderAsserts;
 import org.apache.sqoop.common.test.db.DatabaseProvider;
 import org.apache.sqoop.common.test.db.TableName;
+import org.apache.sqoop.common.test.kafka.TestUtil;
 import org.apache.sqoop.connector.hdfs.configuration.ToFormat;
 import org.apache.sqoop.model.MConfigList;
 import org.apache.sqoop.model.MJob;
 import org.apache.sqoop.model.MLink;
-import org.apache.sqoop.model.MPersistableEntity;
 import org.apache.sqoop.model.MSubmission;
 import org.apache.sqoop.submission.SubmissionStatus;
+import org.apache.sqoop.test.asserts.HdfsAsserts;
 import org.apache.sqoop.test.data.Cities;
 import org.apache.sqoop.test.data.ShortStories;
 import org.apache.sqoop.test.data.UbuntuReleases;
@@ -45,6 +49,7 @@ import org.apache.sqoop.test.kdc.KdcRunner;
 import org.apache.sqoop.test.utils.HdfsUtils;
 import org.apache.sqoop.test.utils.SqoopUtils;
 import org.apache.sqoop.validation.Status;
+import org.testng.Assert;
 import org.testng.ITest;
 import org.testng.ITestContext;
 import org.testng.ITestNGMethod;
@@ -52,16 +57,22 @@ import org.testng.annotations.AfterSuite;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.BeforeSuite;
 
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
 import java.lang.reflect.Method;
 import java.net.URL;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import kafka.message.MessageAndMetadata;
+
+import static org.apache.sqoop.connector.common.SqoopIDFUtils.toText;
 import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNotSame;
 
 /**
  * Use Infrastructure annotation to boot up miniclusters.
@@ -98,12 +109,16 @@ public class SqoopTestCase implements ITest {
 
   private static String suiteName;
 
-  private String methodName;
+  protected String methodName;
 
   private SqoopClient client;
 
   private DelegationTokenAuthenticatedURL.Token authToken = new DelegationTokenAuthenticatedURL.Token();
 
+  protected FileSystem hdfsClient;
+
+  protected DatabaseProvider provider;
+
   @BeforeSuite
   public static void findSuiteName(ITestContext context) {
     suiteName = context.getSuite().getName();
@@ -269,11 +284,9 @@ public class SqoopTestCase implements ITest {
    * @param partitionColumn
    */
   public void fillRdbmsFromConfig(MJob job, String partitionColumn) {
-    DatabaseProvider provider = getInfrastructureProvider(DatabaseInfrastructureProvider.class).getInstance();
-
     MConfigList fromConfig = job.getFromJobConfig();
-    fromConfig.getStringInput("fromJobConfig.tableName").setValue(provider.escapeTableName(getTableName().getTableName()));
-    fromConfig.getStringInput("fromJobConfig.partitionColumn").setValue(provider.escapeColumnName(partitionColumn));
+    fromConfig.getStringInput("fromJobConfig.tableName").setValue(getTableName().getTableName());
+    fromConfig.getStringInput("fromJobConfig.partitionColumn").setValue(partitionColumn);
   }
 
   /**
@@ -281,10 +294,8 @@ public class SqoopTestCase implements ITest {
    * @param job
    */
   public void fillRdbmsToConfig(MJob job) {
-    DatabaseProvider provider = getInfrastructureProvider(DatabaseInfrastructureProvider.class).getInstance();
-
     MConfigList toConfig = job.getToJobConfig();
-    toConfig.getStringInput("toJobConfig.tableName").setValue(provider.escapeTableName(getTableName().getTableName()));
+    toConfig.getStringInput("toJobConfig.tableName").setValue(getTableName().getTableName());
   }
 
   /**
@@ -318,6 +329,12 @@ public class SqoopTestCase implements ITest {
     toConfig.getStringInput("toJobConfig.outputDirectory").setValue(getMapreduceDirectory());
   }
 
+  public void fillHdfsLink(MLink link) {
+    MConfigList configs = link.getConnectorLinkConfig();
+    configs.getStringInput("linkConfig.confDir").setValue(
+        (getInfrastructureProvider(SqoopInfrastructureProvider.class)).getInstance().getConfigurationPath());
+  }
+
   public String getSqoopServerUrl() {
     if (getInfrastructureProvider(SqoopInfrastructureProvider.class) == null) {
       return null;
@@ -335,11 +352,8 @@ public class SqoopTestCase implements ITest {
     return authToken;
   }
 
-  /**
-   * Create a sqoop client
-   */
   @BeforeMethod
-  public void initSqoopClient() throws Exception {
+  public void init() throws Exception {
     String serverUrl = getSqoopServerUrl();
 
     if (serverUrl != null) {
@@ -351,6 +365,15 @@ public class SqoopTestCase implements ITest {
         kdcProvider.getInstance().authenticateWithSqoopServer(new URL(serverUrl), authToken);
       }
     }
+
+    if (getInfrastructureProvider(HadoopInfrastructureProvider.class) != null) {
+      hdfsClient = FileSystem.get(getInfrastructureProvider(HadoopInfrastructureProvider.class).getHadoopConfiguration());
+      hdfsClient.delete(new Path(getMapreduceDirectory()), true);
+    }
+
+    if (getInfrastructureProvider(DatabaseInfrastructureProvider.class) != null) {
+      provider = getInfrastructureProvider(DatabaseInfrastructureProvider.class).getInstance();
+    }
   }
 
   /**
@@ -390,6 +413,16 @@ public class SqoopTestCase implements ITest {
   }
 
   /**
+   * Run given job.
+   *
+   * @param job Job object
+   * @throws Exception
+   */
+  protected void executeJob(MJob job) throws Exception {
+    executeJob(job.getName());
+  }
+
+  /**
    * Fetch table name to be used by this test.
    * @return TableName
    */
@@ -493,4 +526,108 @@ public class SqoopTestCase implements ITest {
       getClient().deleteLink(link.getName());
     }
   }
+
+  /**
+   * Assert that execution has generated following lines.
+   *
+   * As the lines can be spread between multiple files the ordering do not make
+   * a difference.
+   *
+   * @param lines
+   * @throws IOException
+   */
+  protected void assertTo(String... lines) throws IOException {
+    // TODO(VB): fix this to be not directly dependent on hdfs/MR
+    HdfsAsserts.assertMapreduceOutput(hdfsClient, getMapreduceDirectory(), lines);
+  }
+
+  /**
+   * Verify number of TO files.
+   *
+   * @param expectedFiles Expected number of files
+   */
+  protected void assertToFiles(int expectedFiles) throws IOException {
+    // TODO(VB): fix this to be not directly dependent on hdfs/MR
+    HdfsAsserts.assertMapreduceOutputFiles(hdfsClient, getMapreduceDirectory(), expectedFiles);
+  }
+
+  /**
+   * Assert row in testing table.
+   *
+   * @param conditions Conditions in config that are expected by the database provider
+   * @param values Values that are expected in the table (with corresponding types)
+   */
+  protected void assertRow(Object[] conditions, Object ...values) {
+    DatabaseProvider provider = getInfrastructureProvider(DatabaseInfrastructureProvider.class).getInstance();
+    ProviderAsserts.assertRow(provider, getTableName(), conditions, values);
+  }
+
+  /**
+   * Assert row in table "cities".
+   *
+   * @param values Values that are expected
+   */
+  protected void assertRowInCities(Object... values) {
+    assertRow(new Object[]{"id", values[0]}, values);
+  }
+
+  /**
+   * Create FROM file with specified content.
+   *
+   * @param filename Input file name
+   * @param lines Individual lines that should be written into the file
+   * @throws IOException
+   */
+  protected void createFromFile(String filename, String...lines) throws IOException {
+    createFromFile(hdfsClient, filename, lines);
+  }
+
+  /**
+   * Create file on given HDFS instance with given lines
+   */
+  protected void createFromFile(FileSystem hdfsClient, String filename, String...lines) throws IOException {
+    HdfsUtils.createFile(hdfsClient, HdfsUtils.joinPathFragments(getMapreduceDirectory(), filename), lines);
+  }
+
+  /**
+   * Create table cities.
+   */
+  protected void createTableCities() {
+    DatabaseProvider provider = getInfrastructureProvider(DatabaseInfrastructureProvider.class).getInstance();
+    new Cities(provider, getTableName()).createTables();
+  }
+
+  protected void fillKafkaLinkConfig(MLink link) {
+    MConfigList configs = link.getConnectorLinkConfig();
+    configs.getStringInput("linkConfig.brokerList").setValue(TestUtil.getInstance().getKafkaServerUrl());
+    configs.getStringInput("linkConfig.zookeeperConnect").setValue(TestUtil.getInstance().getZkUrl());
+
+  }
+
+  protected void fillKafkaToConfig(MJob job, String topic){
+    MConfigList toConfig = job.getToJobConfig();
+    toConfig.getStringInput("toJobConfig.topic").setValue(topic);
+    List<String> topics = new ArrayList<String>(1);
+    topics.add(topic);
+    TestUtil.getInstance().initTopicList(topics);
+  }
+
+  /**
+   * Compare strings in content to the messages in Kafka topic
+   * @param content
+   * @throws UnsupportedEncodingException
+   */
+  protected void validateContent(String[] content, String topic) throws UnsupportedEncodingException {
+
+    Set<String> inputSet = new HashSet<String>(Arrays.asList(content));
+    Set<String> outputSet = new HashSet<String>();
+
+    for(int i = 0; i < content.length; i++) {
+      MessageAndMetadata<byte[],byte[]> fetchedMsg =
+          TestUtil.getInstance().getNextMessageFromConsumer(topic);
+      outputSet.add(toText(new String(fetchedMsg.message(), "UTF-8")));
+    }
+
+    Assert.assertEquals(inputSet, outputSet);
+  }
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/8d63df71/test/src/main/java/org/apache/sqoop/test/infrastructure/providers/HiveInfrastructureProvider.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/sqoop/test/infrastructure/providers/HiveInfrastructureProvider.java b/test/src/main/java/org/apache/sqoop/test/infrastructure/providers/HiveInfrastructureProvider.java
new file mode 100644
index 0000000..0db2891
--- /dev/null
+++ b/test/src/main/java/org/apache/sqoop/test/infrastructure/providers/HiveInfrastructureProvider.java
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.test.infrastructure.providers;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.log4j.Logger;
+import org.apache.sqoop.common.test.db.HiveProvider;
+import org.apache.sqoop.test.hive.HiveServerRunner;
+import org.apache.sqoop.test.hive.HiveServerRunnerFactory;
+import org.apache.sqoop.test.hive.InternalHiveServerRunner;
+import org.apache.sqoop.test.hive.InternalMetastoreServerRunner;
+import org.apache.sqoop.test.hive.MetastoreServerRunner;
+import org.apache.sqoop.test.hive.MetastoreServerRunnerFactory;
+import org.apache.sqoop.test.kdc.KdcRunner;
+import org.apache.sqoop.test.utils.HdfsUtils;
+
+public class HiveInfrastructureProvider extends InfrastructureProvider {
+
+  private static final Logger LOG = Logger.getLogger(HiveInfrastructureProvider.class);
+
+  private HiveServerRunner hiveServerRunner;
+  private MetastoreServerRunner metastoreServerRunner;
+  private HiveProvider hiveProvider;
+
+  private FileSystem hdfsClient;
+
+  public HiveInfrastructureProvider() {
+    try {
+      metastoreServerRunner = MetastoreServerRunnerFactory.getRunner(System.getProperties(), InternalMetastoreServerRunner.class);
+      hiveServerRunner = HiveServerRunnerFactory.getRunner(System.getProperties(), InternalHiveServerRunner.class);
+    } catch (Exception e) {
+      LOG.error("Error fetching Hadoop runner.", e);
+    }
+  }
+
+  @Override
+  public void start() {
+    try {
+      LOG.info("Starting Metastore Server: " + metastoreServerRunner.getClass().getName());
+      metastoreServerRunner.start();
+
+      LOG.info("Starting Hive Server: " + hiveServerRunner.getClass().getName());
+      hiveServerRunner.start();
+
+      LOG.info("Starting Hive Provider: " + HiveProvider.class.getName());
+      hiveProvider = new HiveProvider(hiveServerRunner.getUrl());
+      hiveProvider.start();
+    } catch (Exception e) {
+      LOG.error("Error starting hive runner.", e);
+    }
+  }
+
+  @Override
+  public void stop() {
+    try {
+      if (hiveProvider != null) {
+        LOG.info("Stopping Hive Provider: " + hiveProvider.getClass().getName());
+        hiveProvider.stop();
+      }
+
+      if (hiveServerRunner != null) {
+        LOG.info("Stopping Hive Server: " + hiveServerRunner.getClass().getName());
+        hiveServerRunner.stop();
+      }
+
+      if (metastoreServerRunner != null) {
+        LOG.info("Stopping Metastore Server: " + metastoreServerRunner.getClass().getName());
+        metastoreServerRunner.stop();
+      }
+    } catch(Exception e) {
+      LOG.error("Could not stop hive runner.", e);
+    }
+  }
+
+  @Override
+  public void setHadoopConfiguration(Configuration conf) {
+    try {
+      hdfsClient = FileSystem.get(conf);
+      String databasePath = HdfsUtils.joinPathFragments(getRootPath(), "metastore_db");
+      metastoreServerRunner.setConfiguration(metastoreServerRunner.prepareConfiguration(conf));
+      metastoreServerRunner.getConfiguration().set(HiveConf.ConfVars.METASTORECONNECTURLKEY.varname,
+          "jdbc:derby:;databaseName=" + databasePath + ";create=true");
+      ensureWarehouseDirectory(metastoreServerRunner.getConfiguration());
+      hiveServerRunner.setConfiguration(hiveServerRunner.prepareConfiguration(metastoreServerRunner.getConfiguration()));
+    } catch (Exception e) {
+      LOG.error("Could not set configuration.", e);
+    }
+  }
+
+  @Override
+  public Configuration getHadoopConfiguration() {
+    return hiveServerRunner.getConfiguration();
+  }
+
+  @Override
+  public void setRootPath(String path) {
+    metastoreServerRunner.setTemporaryPath(path);
+  }
+
+  @Override
+  public String getRootPath() {
+    return metastoreServerRunner.getTemporaryPath();
+  }
+
+  @Override
+  public void setKdc(KdcRunner kdc) {
+    // Do nothing for the time being. Need to handle this when we support kerberos enabled MiniCluster.
+  }
+
+  public MetastoreServerRunner getHiveMetastore() {
+    return metastoreServerRunner;
+  }
+
+  public HiveServerRunner getHiveServer() {
+    return hiveServerRunner;
+  }
+
+  public HiveProvider getHiveProvider() {
+    return hiveProvider;
+  }
+
+  private void ensureWarehouseDirectory(Configuration conf) throws Exception {
+    String warehouseDirectory = conf.get(HiveConf.ConfVars.METASTOREWAREHOUSE.varname);
+    StringBuilder dir = new StringBuilder();
+    for (String part : warehouseDirectory.split("/")) {
+      dir.append(part).append("/");
+      Path path = new Path(dir.toString());
+      if (!hdfsClient.exists(path)) {
+        hdfsClient.mkdirs(path);
+      }
+    }
+    hdfsClient.setPermission(new Path(dir.toString()), new FsPermission((short)01777));
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/8d63df71/test/src/main/java/org/apache/sqoop/test/infrastructure/providers/KafkaInfrastructureProvider.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/sqoop/test/infrastructure/providers/KafkaInfrastructureProvider.java b/test/src/main/java/org/apache/sqoop/test/infrastructure/providers/KafkaInfrastructureProvider.java
new file mode 100644
index 0000000..e2a32d7
--- /dev/null
+++ b/test/src/main/java/org/apache/sqoop/test/infrastructure/providers/KafkaInfrastructureProvider.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.test.infrastructure.providers;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.log4j.Logger;
+import org.apache.sqoop.common.test.kafka.TestUtil;
+import org.apache.sqoop.test.kdc.KdcRunner;
+
+public class KafkaInfrastructureProvider extends InfrastructureProvider {
+
+  private static final Logger LOG = Logger.getLogger(KafkaInfrastructureProvider.class);
+
+  private static TestUtil testUtil = TestUtil.getInstance();
+  protected String topic;
+
+  @Override
+  public void start() {
+    // starts Kafka server and its dependent zookeeper
+    try {
+      testUtil.prepare();
+    } catch (Exception e) {
+      LOG.error("Error starting kafka.", e);
+    }
+  }
+
+  @Override
+  public void stop() {
+    try {
+      testUtil.tearDown();
+    } catch (IOException e) {
+      LOG.error("Error stopping kafka.", e);
+    }
+  }
+
+  @Override
+  public void setHadoopConfiguration(Configuration conf) {
+    // do nothing
+  }
+
+  @Override
+  public Configuration getHadoopConfiguration() {
+    return null;
+  }
+
+  @Override
+  public void setRootPath(String path) {
+    // do nothing
+  }
+
+  @Override
+  public String getRootPath() {
+    return null;
+  }
+
+  @Override
+  public void setKdc(KdcRunner kdc) {
+    // do nothing
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/8d63df71/test/src/main/java/org/apache/sqoop/test/testcases/HiveConnectorTestCase.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/sqoop/test/testcases/HiveConnectorTestCase.java b/test/src/main/java/org/apache/sqoop/test/testcases/HiveConnectorTestCase.java
deleted file mode 100644
index 162adf8..0000000
--- a/test/src/main/java/org/apache/sqoop/test/testcases/HiveConnectorTestCase.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.test.testcases;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.log4j.Logger;
-import org.apache.sqoop.common.test.db.HiveProvider;
-import org.apache.sqoop.test.hive.InternalHiveServerRunner;
-import org.apache.sqoop.test.hive.HiveServerRunner;
-import org.apache.sqoop.test.hive.HiveServerRunnerFactory;
-import org.apache.sqoop.test.hive.InternalMetastoreServerRunner;
-import org.apache.sqoop.test.hive.MetastoreServerRunner;
-import org.apache.sqoop.test.hive.MetastoreServerRunnerFactory;
-import org.apache.sqoop.test.utils.HdfsUtils;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
-
-public class HiveConnectorTestCase extends ConnectorTestCase {
-  private static final Logger LOG = Logger.getLogger(HiveConnectorTestCase.class);
-
-  protected HiveServerRunner hiveServerRunner;
-  protected MetastoreServerRunner metastoreServerRunner;
-  protected HiveProvider hiveProvider;
-
-  private void ensureWarehouseDirectory(Configuration conf) throws Exception {
-    String warehouseDirectory = conf.get(HiveConf.ConfVars.METASTOREWAREHOUSE.varname);
-    StringBuilder dir = new StringBuilder();
-    for (String part : warehouseDirectory.split("/")) {
-      dir.append(part).append("/");
-      Path path = new Path(dir.toString());
-      if (!hdfsClient.exists(path)) {
-        hdfsClient.mkdirs(path);
-      }
-    }
-    hdfsClient.setPermission(new Path(dir.toString()), new FsPermission((short)01777));
-  }
-
-  @BeforeMethod(alwaysRun = true)
-  public void startHive() throws Exception {
-    String databasePath = HdfsUtils.joinPathFragments(getTemporaryPath(), "metastore_db");
-    metastoreServerRunner = MetastoreServerRunnerFactory.getRunner(System.getProperties(), InternalMetastoreServerRunner.class);
-    metastoreServerRunner.setConfiguration(metastoreServerRunner.prepareConfiguration(hadoopCluster.getConfiguration()));
-    metastoreServerRunner.getConfiguration().set(HiveConf.ConfVars.METASTORECONNECTURLKEY.varname,
-        "jdbc:derby:;databaseName=" + databasePath + ";create=true");
-    ensureWarehouseDirectory(metastoreServerRunner.getConfiguration());
-    LOG.info("Starting Metastore Server: " + metastoreServerRunner.getClass().getName());
-    metastoreServerRunner.start();
-
-    hiveServerRunner = HiveServerRunnerFactory.getRunner(System.getProperties(), InternalHiveServerRunner.class);
-    hiveServerRunner.setConfiguration(hiveServerRunner.prepareConfiguration(metastoreServerRunner.getConfiguration()));
-    LOG.info("Starting Hive Server: " + hiveServerRunner.getClass().getName());
-    hiveServerRunner.start();
-
-    LOG.info("Starting Hive Provider: " + provider.getClass().getName());
-    hiveProvider = new HiveProvider(hiveServerRunner.getUrl());
-    hiveProvider.start();
-  }
-
-  @AfterMethod(alwaysRun = true)
-  public void stopHive() throws Exception {
-    if (hiveProvider != null) {
-      LOG.info("Stopping Hive Provider: " + provider.getClass().getName());
-      hiveProvider.stop();
-    }
-
-    if (hiveServerRunner != null) {
-      LOG.info("Stopping Hive Server: " + hiveServerRunner.getClass().getName());
-      hiveServerRunner.stop();
-    }
-
-    if (metastoreServerRunner != null) {
-      LOG.info("Stopping Metastore Server: " + metastoreServerRunner.getClass().getName());
-      metastoreServerRunner.stop();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/8d63df71/test/src/main/java/org/apache/sqoop/test/testcases/KafkaConnectorTestCase.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/sqoop/test/testcases/KafkaConnectorTestCase.java b/test/src/main/java/org/apache/sqoop/test/testcases/KafkaConnectorTestCase.java
deleted file mode 100644
index f15c07e..0000000
--- a/test/src/main/java/org/apache/sqoop/test/testcases/KafkaConnectorTestCase.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.test.testcases;
-
-import kafka.message.MessageAndMetadata;
-import org.apache.sqoop.model.MConfigList;
-import org.apache.sqoop.model.MJob;
-import org.apache.sqoop.model.MLink;
-import org.testng.Assert;
-import org.apache.sqoop.common.test.kafka.TestUtil;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
-
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Set;
-import java.util.HashSet;
-import java.util.Arrays;
-
-import static org.apache.sqoop.connector.common.SqoopIDFUtils.toText;
-
-public class KafkaConnectorTestCase extends ConnectorTestCase {
-  private static TestUtil testUtil = TestUtil.getInstance();
-  protected String topic;
-
-  @BeforeClass(alwaysRun = true)
-  public void startKafka() throws Exception {
-    // starts Kafka server and its dependent zookeeper
-    testUtil.prepare();
-  }
-
-  @AfterClass(alwaysRun = true)
-  public void stopKafka() throws IOException {
-    testUtil.tearDown();
-  }
-
-  protected void fillKafkaLinkConfig(MLink link) {
-    MConfigList configs = link.getConnectorLinkConfig();
-    configs.getStringInput("linkConfig.brokerList").setValue(testUtil.getKafkaServerUrl());
-    configs.getStringInput("linkConfig.zookeeperConnect").setValue(testUtil.getZkUrl());
-
-  }
-
-  protected void fillKafkaToConfig(MJob job){
-    MConfigList toConfig = job.getToJobConfig();
-    toConfig.getStringInput("toJobConfig.topic").setValue(topic);
-    List<String> topics = new ArrayList<String>(1);
-    topics.add(topic);
-    testUtil.initTopicList(topics);
-  }
-
-  /**
-   * Compare strings in content to the messages in Kafka topic
-   * @param content
-   * @throws UnsupportedEncodingException
-   */
-  protected void validateContent(String[] content) throws UnsupportedEncodingException {
-
-    Set<String> inputSet = new HashSet<String>(Arrays.asList(content));
-    Set<String> outputSet = new HashSet<String>();
-
-    for(String str: content) {
-      MessageAndMetadata<byte[],byte[]> fetchedMsg =
-              testUtil.getNextMessageFromConsumer(topic);
-      outputSet.add(toText(new String(fetchedMsg.message(), "UTF-8")));
-    }
-
-    Assert.assertEquals(inputSet, outputSet);
-  }
-}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/8d63df71/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/AppendModeTest.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/AppendModeTest.java b/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/AppendModeTest.java
index fa660d5..6885525 100644
--- a/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/AppendModeTest.java
+++ b/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/AppendModeTest.java
@@ -17,17 +17,20 @@
  */
 package org.apache.sqoop.integration.connector.hdfs;
 
-import org.apache.sqoop.common.Direction;
 import org.apache.sqoop.connector.hdfs.configuration.ToFormat;
 import org.apache.sqoop.model.MConfigList;
 import org.apache.sqoop.model.MJob;
 import org.apache.sqoop.model.MLink;
-import org.apache.sqoop.test.testcases.ConnectorTestCase;
+import org.apache.sqoop.test.infrastructure.Infrastructure;
+import org.apache.sqoop.test.infrastructure.SqoopTestCase;
+import org.apache.sqoop.test.infrastructure.providers.DatabaseInfrastructureProvider;
+import org.apache.sqoop.test.infrastructure.providers.HadoopInfrastructureProvider;
+import org.apache.sqoop.test.infrastructure.providers.KdcInfrastructureProvider;
+import org.apache.sqoop.test.infrastructure.providers.SqoopInfrastructureProvider;
 import org.testng.annotations.Test;
 
-/**
- */
-public class AppendModeTest extends ConnectorTestCase {
+@Infrastructure(dependencies = {KdcInfrastructureProvider.class, HadoopInfrastructureProvider.class, SqoopInfrastructureProvider.class, DatabaseInfrastructureProvider.class})
+public class AppendModeTest extends SqoopTestCase {
 
   @Test
   public void test() throws Exception {

http://git-wip-us.apache.org/repos/asf/sqoop/blob/8d63df71/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/FromHDFSToHDFSTest.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/FromHDFSToHDFSTest.java b/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/FromHDFSToHDFSTest.java
index c953131..c6ce1e8 100644
--- a/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/FromHDFSToHDFSTest.java
+++ b/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/FromHDFSToHDFSTest.java
@@ -23,14 +23,19 @@ import org.apache.sqoop.model.MDriverConfig;
 import org.apache.sqoop.model.MJob;
 import org.apache.sqoop.model.MLink;
 import org.apache.sqoop.test.asserts.HdfsAsserts;
-import org.apache.sqoop.test.testcases.ConnectorTestCase;
+import org.apache.sqoop.test.infrastructure.Infrastructure;
+import org.apache.sqoop.test.infrastructure.SqoopTestCase;
+import org.apache.sqoop.test.infrastructure.providers.HadoopInfrastructureProvider;
+import org.apache.sqoop.test.infrastructure.providers.KdcInfrastructureProvider;
+import org.apache.sqoop.test.infrastructure.providers.SqoopInfrastructureProvider;
 import org.apache.sqoop.test.utils.HdfsUtils;
 import org.testng.annotations.Test;
 
 /**
  * Test schemaless to schemaless transfer by using two hdfs connectors
  */
-public class FromHDFSToHDFSTest extends ConnectorTestCase {
+@Infrastructure(dependencies = {KdcInfrastructureProvider.class, HadoopInfrastructureProvider.class, SqoopInfrastructureProvider.class})
+public class FromHDFSToHDFSTest extends SqoopTestCase {
 
   @Test
   public void test() throws Exception {

http://git-wip-us.apache.org/repos/asf/sqoop/blob/8d63df71/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/HdfsIncrementalReadTest.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/HdfsIncrementalReadTest.java b/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/HdfsIncrementalReadTest.java
index d48c6d2..37306e2 100644
--- a/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/HdfsIncrementalReadTest.java
+++ b/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/HdfsIncrementalReadTest.java
@@ -20,7 +20,12 @@ package org.apache.sqoop.integration.connector.hdfs;
 import org.apache.sqoop.connector.hdfs.configuration.IncrementalType;
 import org.apache.sqoop.model.MJob;
 import org.apache.sqoop.model.MLink;
-import org.apache.sqoop.test.testcases.ConnectorTestCase;
+import org.apache.sqoop.test.infrastructure.Infrastructure;
+import org.apache.sqoop.test.infrastructure.SqoopTestCase;
+import org.apache.sqoop.test.infrastructure.providers.DatabaseInfrastructureProvider;
+import org.apache.sqoop.test.infrastructure.providers.HadoopInfrastructureProvider;
+import org.apache.sqoop.test.infrastructure.providers.KdcInfrastructureProvider;
+import org.apache.sqoop.test.infrastructure.providers.SqoopInfrastructureProvider;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
@@ -29,7 +34,8 @@ import java.sql.Timestamp;
 
 import static org.testng.Assert.assertEquals;
 
-public class HdfsIncrementalReadTest extends ConnectorTestCase {
+@Infrastructure(dependencies = {KdcInfrastructureProvider.class, HadoopInfrastructureProvider.class, SqoopInfrastructureProvider.class, DatabaseInfrastructureProvider.class})
+public class HdfsIncrementalReadTest extends SqoopTestCase {
 
   @BeforeMethod(alwaysRun = true)
   public void createTable() {

http://git-wip-us.apache.org/repos/asf/sqoop/blob/8d63df71/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/InformalJobNameExecuteTest.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/InformalJobNameExecuteTest.java b/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/InformalJobNameExecuteTest.java
index cd2ed6c..0e14b7a 100644
--- a/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/InformalJobNameExecuteTest.java
+++ b/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/InformalJobNameExecuteTest.java
@@ -21,14 +21,20 @@ package org.apache.sqoop.integration.connector.hdfs;
 import org.apache.sqoop.model.MDriverConfig;
 import org.apache.sqoop.model.MJob;
 import org.apache.sqoop.model.MLink;
-import org.apache.sqoop.test.testcases.ConnectorTestCase;
+import org.apache.sqoop.test.infrastructure.Infrastructure;
+import org.apache.sqoop.test.infrastructure.SqoopTestCase;
+import org.apache.sqoop.test.infrastructure.providers.DatabaseInfrastructureProvider;
+import org.apache.sqoop.test.infrastructure.providers.HadoopInfrastructureProvider;
+import org.apache.sqoop.test.infrastructure.providers.KdcInfrastructureProvider;
+import org.apache.sqoop.test.infrastructure.providers.SqoopInfrastructureProvider;
 import org.testng.annotations.*;
 
 import java.sql.Timestamp;
 
 import static org.testng.Assert.assertEquals;
 
-public class InformalJobNameExecuteTest extends ConnectorTestCase {
+@Infrastructure(dependencies = {KdcInfrastructureProvider.class, HadoopInfrastructureProvider.class, SqoopInfrastructureProvider.class, DatabaseInfrastructureProvider.class})
+public class InformalJobNameExecuteTest extends SqoopTestCase {
 
   private String jobName;
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/8d63df71/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/OutputDirectoryTest.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/OutputDirectoryTest.java b/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/OutputDirectoryTest.java
index 2759b42..e5e3d26 100644
--- a/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/OutputDirectoryTest.java
+++ b/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/OutputDirectoryTest.java
@@ -19,14 +19,17 @@ package org.apache.sqoop.integration.connector.hdfs;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.sqoop.client.ClientError;
-import org.apache.sqoop.common.Direction;
 import org.apache.sqoop.common.SqoopException;
 import org.apache.sqoop.connector.hdfs.configuration.ToFormat;
 import org.apache.sqoop.error.code.HdfsConnectorError;
-import org.apache.sqoop.model.MConfigList;
 import org.apache.sqoop.model.MJob;
 import org.apache.sqoop.model.MLink;
-import org.apache.sqoop.test.testcases.ConnectorTestCase;
+import org.apache.sqoop.test.infrastructure.Infrastructure;
+import org.apache.sqoop.test.infrastructure.SqoopTestCase;
+import org.apache.sqoop.test.infrastructure.providers.DatabaseInfrastructureProvider;
+import org.apache.sqoop.test.infrastructure.providers.HadoopInfrastructureProvider;
+import org.apache.sqoop.test.infrastructure.providers.KdcInfrastructureProvider;
+import org.apache.sqoop.test.infrastructure.providers.SqoopInfrastructureProvider;
 import org.testng.annotations.Test;
 
 import static org.testng.Assert.assertEquals;
@@ -34,9 +37,8 @@ import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
-/**
- */
-public class OutputDirectoryTest extends ConnectorTestCase {
+@Infrastructure(dependencies = {KdcInfrastructureProvider.class, HadoopInfrastructureProvider.class, SqoopInfrastructureProvider.class, DatabaseInfrastructureProvider.class})
+public class OutputDirectoryTest extends SqoopTestCase {
   @Test
   public void testOutputDirectoryIsAFile() throws Exception {
     createAndLoadTableCities();

http://git-wip-us.apache.org/repos/asf/sqoop/blob/8d63df71/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/S3Test.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/S3Test.java b/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/S3Test.java
index f98870d..c857699 100644
--- a/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/S3Test.java
+++ b/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/S3Test.java
@@ -25,7 +25,12 @@ import org.apache.sqoop.connector.hdfs.configuration.ToFormat;
 import org.apache.sqoop.model.MJob;
 import org.apache.sqoop.model.MLink;
 import org.apache.sqoop.test.asserts.HdfsAsserts;
-import org.apache.sqoop.test.testcases.ConnectorTestCase;
+import org.apache.sqoop.test.infrastructure.Infrastructure;
+import org.apache.sqoop.test.infrastructure.SqoopTestCase;
+import org.apache.sqoop.test.infrastructure.providers.DatabaseInfrastructureProvider;
+import org.apache.sqoop.test.infrastructure.providers.HadoopInfrastructureProvider;
+import org.apache.sqoop.test.infrastructure.providers.KdcInfrastructureProvider;
+import org.apache.sqoop.test.infrastructure.providers.SqoopInfrastructureProvider;
 import org.testng.SkipException;
 import org.testng.annotations.Test;
 
@@ -42,7 +47,8 @@ import static org.testng.Assert.assertEquals;
  *   -Dorg.apache.sqoop.integration.connector.hdfs.s3.access=AKI...
  *   -Dorg.apache.sqoop.integration.connector.hdfs.s3.secret=93JKx...
  */
-public class S3Test extends ConnectorTestCase {
+@Infrastructure(dependencies = {KdcInfrastructureProvider.class, HadoopInfrastructureProvider.class, SqoopInfrastructureProvider.class, DatabaseInfrastructureProvider.class})
+public class S3Test extends SqoopTestCase {
 
   public static final String PROPERTY_BUCKET = "org.apache.sqoop.integration.connector.hdfs.s3.bucket";
   public static final String PROPERTY_ACCESS = "org.apache.sqoop.integration.connector.hdfs.s3.access";

http://git-wip-us.apache.org/repos/asf/sqoop/blob/8d63df71/test/src/test/java/org/apache/sqoop/integration/connector/hive/FromRDBMSToKiteHiveTest.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/sqoop/integration/connector/hive/FromRDBMSToKiteHiveTest.java b/test/src/test/java/org/apache/sqoop/integration/connector/hive/FromRDBMSToKiteHiveTest.java
index e964422..ec9f733 100644
--- a/test/src/test/java/org/apache/sqoop/integration/connector/hive/FromRDBMSToKiteHiveTest.java
+++ b/test/src/test/java/org/apache/sqoop/integration/connector/hive/FromRDBMSToKiteHiveTest.java
@@ -18,16 +18,20 @@
 package org.apache.sqoop.integration.connector.hive;
 
 import org.apache.sqoop.common.test.asserts.ProviderAsserts;
-import org.apache.sqoop.common.test.db.DatabaseProvider;
-import org.apache.sqoop.common.test.db.DatabaseProviderFactory;
+import org.apache.sqoop.common.test.db.HiveProvider;
 import org.apache.sqoop.common.test.db.TableName;
 import org.apache.sqoop.connector.common.FileFormat;
 import org.apache.sqoop.model.MConfigList;
 import org.apache.sqoop.model.MDriverConfig;
 import org.apache.sqoop.model.MJob;
 import org.apache.sqoop.model.MLink;
-import org.apache.sqoop.test.testcases.HiveConnectorTestCase;
-import org.testng.ITest;
+import org.apache.sqoop.test.infrastructure.Infrastructure;
+import org.apache.sqoop.test.infrastructure.SqoopTestCase;
+import org.apache.sqoop.test.infrastructure.providers.DatabaseInfrastructureProvider;
+import org.apache.sqoop.test.infrastructure.providers.HadoopInfrastructureProvider;
+import org.apache.sqoop.test.infrastructure.providers.HiveInfrastructureProvider;
+import org.apache.sqoop.test.infrastructure.providers.KdcInfrastructureProvider;
+import org.apache.sqoop.test.infrastructure.providers.SqoopInfrastructureProvider;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.DataProvider;
@@ -38,14 +42,14 @@ import java.lang.reflect.Method;
 import java.util.List;
 
 @Test(groups = {"slow", "no-real-cluster"})
-public class FromRDBMSToKiteHiveTest extends HiveConnectorTestCase implements ITest {
+@Infrastructure(dependencies = {KdcInfrastructureProvider.class, HadoopInfrastructureProvider.class, HiveInfrastructureProvider.class, SqoopInfrastructureProvider.class, DatabaseInfrastructureProvider.class})
+public class FromRDBMSToKiteHiveTest extends SqoopTestCase {
   private String testName;
 
   private FileFormat fileFormat;
 
   private MLink rdbmsLink;
   private MLink kiteLink;
-  private String hiveTableName;
 
   @Factory(dataProvider="rdbms-to-kite-hive-test")
   public FromRDBMSToKiteHiveTest(FileFormat fileFormat) {
@@ -54,7 +58,6 @@ public class FromRDBMSToKiteHiveTest extends HiveConnectorTestCase implements IT
 
   @DataProvider(name="rdbms-to-kite-hive-test", parallel=true)
   public static Object[][] data() throws Exception {
-    DatabaseProvider provider = DatabaseProviderFactory.getProvider(System.getProperties());
     return new Object[][]{
         {FileFormat.AVRO},
         {FileFormat.PARQUET}
@@ -99,7 +102,7 @@ public class FromRDBMSToKiteHiveTest extends HiveConnectorTestCase implements IT
     // Kite link
     kiteLink = getClient().createLink("kite-connector");
     kiteLink.getConnectorLinkConfig().getStringInput("linkConfig.authority")
-        .setValue(metastoreServerRunner.getAuthority());
+        .setValue(getInfrastructureProvider(HiveInfrastructureProvider.class).getHiveMetastore().getAuthority());
     saveLink(kiteLink);
   }
 
@@ -126,6 +129,7 @@ public class FromRDBMSToKiteHiveTest extends HiveConnectorTestCase implements IT
     saveJob(job);
     executeJob(job);
 
+    HiveProvider hiveProvider = getInfrastructureProvider(HiveInfrastructureProvider.class).getHiveProvider();
     // Assert correct output
     ProviderAsserts.assertRow(hiveProvider, new TableName(getHiveTableName()), new Object[]{"id", 1}, "1");
     ProviderAsserts.assertRow(hiveProvider, new TableName(getHiveTableName()), new Object[]{"id", 2}, "2");

http://git-wip-us.apache.org/repos/asf/sqoop/blob/8d63df71/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/AllTypesTest.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/AllTypesTest.java b/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/AllTypesTest.java
index ac5a61a..9c0ee84 100644
--- a/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/AllTypesTest.java
+++ b/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/AllTypesTest.java
@@ -27,9 +27,13 @@ import org.apache.sqoop.model.MConfigList;
 import org.apache.sqoop.model.MDriverConfig;
 import org.apache.sqoop.model.MJob;
 import org.apache.sqoop.model.MLink;
-import org.apache.sqoop.test.testcases.ConnectorTestCase;
+import org.apache.sqoop.test.infrastructure.Infrastructure;
+import org.apache.sqoop.test.infrastructure.SqoopTestCase;
+import org.apache.sqoop.test.infrastructure.providers.DatabaseInfrastructureProvider;
+import org.apache.sqoop.test.infrastructure.providers.HadoopInfrastructureProvider;
+import org.apache.sqoop.test.infrastructure.providers.KdcInfrastructureProvider;
+import org.apache.sqoop.test.infrastructure.providers.SqoopInfrastructureProvider;
 import org.apache.sqoop.test.utils.ParametrizedUtils;
-import org.testng.ITest;
 import org.testng.ITestContext;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Factory;
@@ -43,7 +47,8 @@ import static org.testng.Assert.assertEquals;
  * Test transfer of all supported data types.
  */
 @Test(groups = "slow")
-public class AllTypesTest extends ConnectorTestCase implements ITest {
+@Infrastructure(dependencies = {KdcInfrastructureProvider.class, HadoopInfrastructureProvider.class, SqoopInfrastructureProvider.class, DatabaseInfrastructureProvider.class})
+public class AllTypesTest extends SqoopTestCase {
 
   private static String testName;
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/8d63df71/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/FromHDFSToRDBMSTest.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/FromHDFSToRDBMSTest.java b/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/FromHDFSToRDBMSTest.java
index 07eaba1..933bc08 100644
--- a/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/FromHDFSToRDBMSTest.java
+++ b/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/FromHDFSToRDBMSTest.java
@@ -17,13 +17,15 @@
  */
 package org.apache.sqoop.integration.connector.jdbc.generic;
 
-import org.apache.sqoop.common.Direction;
-import org.apache.sqoop.test.testcases.ConnectorTestCase;
+import org.apache.sqoop.test.infrastructure.Infrastructure;
+import org.apache.sqoop.test.infrastructure.SqoopTestCase;
+import org.apache.sqoop.test.infrastructure.providers.DatabaseInfrastructureProvider;
+import org.apache.sqoop.test.infrastructure.providers.HadoopInfrastructureProvider;
+import org.apache.sqoop.test.infrastructure.providers.KdcInfrastructureProvider;
+import org.apache.sqoop.test.infrastructure.providers.SqoopInfrastructureProvider;
 import org.apache.sqoop.model.MDriverConfig;
 import org.apache.sqoop.model.MLink;
-import org.apache.sqoop.model.MConfigList;
 import org.apache.sqoop.model.MJob;
-import org.testng.annotations.AfterClass;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
@@ -32,10 +34,8 @@ import java.sql.Timestamp;
 
 import static org.testng.Assert.assertEquals;
 
-/**
- *
- */
-public class FromHDFSToRDBMSTest extends ConnectorTestCase {
+@Infrastructure(dependencies = {KdcInfrastructureProvider.class, HadoopInfrastructureProvider.class, SqoopInfrastructureProvider.class, DatabaseInfrastructureProvider.class})
+public class FromHDFSToRDBMSTest extends SqoopTestCase {
   @BeforeMethod(alwaysRun = true)
   public void createTable() {
     createTableCities();

http://git-wip-us.apache.org/repos/asf/sqoop/blob/8d63df71/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/FromRDBMSToHDFSTest.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/FromRDBMSToHDFSTest.java b/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/FromRDBMSToHDFSTest.java
index 4cb0edc..7e66091 100644
--- a/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/FromRDBMSToHDFSTest.java
+++ b/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/FromRDBMSToHDFSTest.java
@@ -22,7 +22,12 @@ import org.apache.sqoop.model.MConfigList;
 import org.apache.sqoop.model.MDriverConfig;
 import org.apache.sqoop.model.MJob;
 import org.apache.sqoop.model.MLink;
-import org.apache.sqoop.test.testcases.ConnectorTestCase;
+import org.apache.sqoop.test.infrastructure.Infrastructure;
+import org.apache.sqoop.test.infrastructure.SqoopTestCase;
+import org.apache.sqoop.test.infrastructure.providers.DatabaseInfrastructureProvider;
+import org.apache.sqoop.test.infrastructure.providers.HadoopInfrastructureProvider;
+import org.apache.sqoop.test.infrastructure.providers.KdcInfrastructureProvider;
+import org.apache.sqoop.test.infrastructure.providers.SqoopInfrastructureProvider;
 import org.testng.annotations.Test;
 
 import java.util.List;
@@ -30,7 +35,8 @@ import java.util.List;
 /**
  * Import simple table with various configurations.
  */
-public class FromRDBMSToHDFSTest extends ConnectorTestCase {
+@Infrastructure(dependencies = {KdcInfrastructureProvider.class, HadoopInfrastructureProvider.class, SqoopInfrastructureProvider.class, DatabaseInfrastructureProvider.class})
+public class FromRDBMSToHDFSTest extends SqoopTestCase {
 
   @Test
   public void testCities() throws Exception {

http://git-wip-us.apache.org/repos/asf/sqoop/blob/8d63df71/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/IncrementalReadTest.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/IncrementalReadTest.java b/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/IncrementalReadTest.java
index 0c04fd9..83012eb 100644
--- a/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/IncrementalReadTest.java
+++ b/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/IncrementalReadTest.java
@@ -23,22 +23,23 @@ import org.apache.sqoop.connector.hdfs.configuration.ToFormat;
 import org.apache.sqoop.model.MConfigList;
 import org.apache.sqoop.model.MJob;
 import org.apache.sqoop.model.MLink;
-import org.apache.sqoop.test.testcases.ConnectorTestCase;
+import org.apache.sqoop.test.infrastructure.Infrastructure;
+import org.apache.sqoop.test.infrastructure.SqoopTestCase;
+import org.apache.sqoop.test.infrastructure.providers.DatabaseInfrastructureProvider;
+import org.apache.sqoop.test.infrastructure.providers.HadoopInfrastructureProvider;
+import org.apache.sqoop.test.infrastructure.providers.KdcInfrastructureProvider;
+import org.apache.sqoop.test.infrastructure.providers.SqoopInfrastructureProvider;
 import org.apache.sqoop.test.utils.ParametrizedUtils;
-import org.testng.ITest;
 import org.testng.ITestContext;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Factory;
 import org.testng.annotations.Test;
 
-import java.lang.reflect.Method;
-
 import static org.testng.Assert.assertEquals;
 
-/**
- */
-public class IncrementalReadTest extends ConnectorTestCase implements ITest {
+@Infrastructure(dependencies = {KdcInfrastructureProvider.class, HadoopInfrastructureProvider.class, SqoopInfrastructureProvider.class, DatabaseInfrastructureProvider.class})
+public class IncrementalReadTest extends SqoopTestCase {
 
   public static Object[] COLUMNS = new Object [][] {
     //       column -                last value -               new max value

http://git-wip-us.apache.org/repos/asf/sqoop/blob/8d63df71/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/PartitionerTest.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/PartitionerTest.java b/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/PartitionerTest.java
index 8129c6a..e5e886e 100644
--- a/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/PartitionerTest.java
+++ b/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/PartitionerTest.java
@@ -18,15 +18,17 @@
 package org.apache.sqoop.integration.connector.jdbc.generic;
 
 import com.google.common.collect.Iterables;
-import org.apache.sqoop.common.Direction;
 import org.apache.sqoop.connector.hdfs.configuration.ToFormat;
 import org.apache.sqoop.model.MDriverConfig;
 import org.apache.sqoop.model.MLink;
-import org.apache.sqoop.model.MConfigList;
 import org.apache.sqoop.model.MJob;
-import org.apache.sqoop.test.testcases.ConnectorTestCase;
+import org.apache.sqoop.test.infrastructure.Infrastructure;
+import org.apache.sqoop.test.infrastructure.SqoopTestCase;
+import org.apache.sqoop.test.infrastructure.providers.DatabaseInfrastructureProvider;
+import org.apache.sqoop.test.infrastructure.providers.HadoopInfrastructureProvider;
+import org.apache.sqoop.test.infrastructure.providers.KdcInfrastructureProvider;
+import org.apache.sqoop.test.infrastructure.providers.SqoopInfrastructureProvider;
 import org.apache.sqoop.test.utils.ParametrizedUtils;
-import org.testng.ITest;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Factory;
 import org.testng.annotations.Test;
@@ -35,7 +37,8 @@ import org.testng.annotations.Test;
  *
  */
 @Test(groups = "slow")
-public class PartitionerTest extends ConnectorTestCase implements ITest {
+@Infrastructure(dependencies = {KdcInfrastructureProvider.class, HadoopInfrastructureProvider.class, SqoopInfrastructureProvider.class, DatabaseInfrastructureProvider.class})
+public class PartitionerTest extends SqoopTestCase {
 
   /**
    * Columns that we will use as partition column with maximal number of

http://git-wip-us.apache.org/repos/asf/sqoop/blob/8d63df71/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/TableStagedRDBMSTest.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/TableStagedRDBMSTest.java b/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/TableStagedRDBMSTest.java
index 70b6eff..890fc10 100644
--- a/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/TableStagedRDBMSTest.java
+++ b/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/TableStagedRDBMSTest.java
@@ -19,21 +19,23 @@ package org.apache.sqoop.integration.connector.jdbc.generic;
 
 import static org.testng.Assert.assertEquals;
 
-import org.apache.sqoop.common.Direction;
 import org.apache.sqoop.common.test.db.TableName;
 import org.apache.sqoop.model.MConfigList;
 import org.apache.sqoop.model.MJob;
 import org.apache.sqoop.model.MLink;
 import org.apache.sqoop.test.data.Cities;
-import org.apache.sqoop.test.testcases.ConnectorTestCase;
+import org.apache.sqoop.test.infrastructure.Infrastructure;
+import org.apache.sqoop.test.infrastructure.SqoopTestCase;
+import org.apache.sqoop.test.infrastructure.providers.DatabaseInfrastructureProvider;
+import org.apache.sqoop.test.infrastructure.providers.HadoopInfrastructureProvider;
+import org.apache.sqoop.test.infrastructure.providers.KdcInfrastructureProvider;
+import org.apache.sqoop.test.infrastructure.providers.SqoopInfrastructureProvider;
 import org.testng.annotations.Test;
 
 import java.sql.Timestamp;
 
-/**
- *
- */
-public class TableStagedRDBMSTest extends ConnectorTestCase {
+@Infrastructure(dependencies = {KdcInfrastructureProvider.class, HadoopInfrastructureProvider.class, SqoopInfrastructureProvider.class, DatabaseInfrastructureProvider.class})
+public class TableStagedRDBMSTest extends SqoopTestCase {
 
   @Test
   public void testStagedTransfer() throws Exception {

http://git-wip-us.apache.org/repos/asf/sqoop/blob/8d63df71/test/src/test/java/org/apache/sqoop/integration/connector/kafka/FromHDFSToKafkaTest.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/sqoop/integration/connector/kafka/FromHDFSToKafkaTest.java b/test/src/test/java/org/apache/sqoop/integration/connector/kafka/FromHDFSToKafkaTest.java
index ea8c7b9..5e349d1 100644
--- a/test/src/test/java/org/apache/sqoop/integration/connector/kafka/FromHDFSToKafkaTest.java
+++ b/test/src/test/java/org/apache/sqoop/integration/connector/kafka/FromHDFSToKafkaTest.java
@@ -20,11 +20,17 @@ package org.apache.sqoop.integration.connector.kafka;
 import org.apache.sqoop.model.MDriverConfig;
 import org.apache.sqoop.model.MJob;
 import org.apache.sqoop.model.MLink;
-import org.apache.sqoop.test.testcases.KafkaConnectorTestCase;
+import org.apache.sqoop.test.infrastructure.Infrastructure;
+import org.apache.sqoop.test.infrastructure.SqoopTestCase;
+import org.apache.sqoop.test.infrastructure.providers.HadoopInfrastructureProvider;
+import org.apache.sqoop.test.infrastructure.providers.KafkaInfrastructureProvider;
+import org.apache.sqoop.test.infrastructure.providers.KdcInfrastructureProvider;
+import org.apache.sqoop.test.infrastructure.providers.SqoopInfrastructureProvider;
 import org.testng.annotations.Test;
 
 @Test(groups = "no-real-cluster")
-public class FromHDFSToKafkaTest extends KafkaConnectorTestCase {
+@Infrastructure(dependencies = {KdcInfrastructureProvider.class, HadoopInfrastructureProvider.class, KafkaInfrastructureProvider.class, SqoopInfrastructureProvider.class})
+public class FromHDFSToKafkaTest extends SqoopTestCase {
 
   public static final String[] input = {
           "A BIRD came down the walk:",
@@ -32,9 +38,10 @@ public class FromHDFSToKafkaTest extends KafkaConnectorTestCase {
           "He bit an angle-worm in halves",
           "And ate the fellow raw."
   };
+
   @Test
-  public void testBasic() throws Exception {
-    topic = getTestName();
+  public void testFromHDFSToKafka() throws Exception {
+    String topic = getTestName();
 
     createFromFile("input-0001",input);
 
@@ -53,7 +60,7 @@ public class FromHDFSToKafkaTest extends KafkaConnectorTestCase {
 
     // Job connector configs
     fillHdfsFromConfig(job);
-    fillKafkaToConfig(job);
+    fillKafkaToConfig(job, topic);
 
     // driver config
     MDriverConfig driverConfig = job.getDriverConfig();
@@ -63,7 +70,7 @@ public class FromHDFSToKafkaTest extends KafkaConnectorTestCase {
     executeJob(job);
 
     // this will assert the content of the array matches the content of the topic
-    validateContent(input);
+    validateContent(input, topic);
   }
 
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/8d63df71/test/src/test/java/org/apache/sqoop/integration/connector/kafka/FromRDBMSToKafkaTest.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/sqoop/integration/connector/kafka/FromRDBMSToKafkaTest.java b/test/src/test/java/org/apache/sqoop/integration/connector/kafka/FromRDBMSToKafkaTest.java
index a34378a..9ae1334 100644
--- a/test/src/test/java/org/apache/sqoop/integration/connector/kafka/FromRDBMSToKafkaTest.java
+++ b/test/src/test/java/org/apache/sqoop/integration/connector/kafka/FromRDBMSToKafkaTest.java
@@ -17,16 +17,20 @@
  */
 package org.apache.sqoop.integration.connector.kafka;
 
-import org.apache.sqoop.common.Direction;
-import org.apache.sqoop.model.MConfigList;
 import org.apache.sqoop.model.MDriverConfig;
 import org.apache.sqoop.model.MJob;
 import org.apache.sqoop.model.MLink;
-import org.apache.sqoop.test.testcases.KafkaConnectorTestCase;
+import org.apache.sqoop.test.infrastructure.Infrastructure;
+import org.apache.sqoop.test.infrastructure.SqoopTestCase;
+import org.apache.sqoop.test.infrastructure.providers.DatabaseInfrastructureProvider;
+import org.apache.sqoop.test.infrastructure.providers.KafkaInfrastructureProvider;
+import org.apache.sqoop.test.infrastructure.providers.KdcInfrastructureProvider;
+import org.apache.sqoop.test.infrastructure.providers.SqoopInfrastructureProvider;
 import org.testng.annotations.Test;
 
 @Test(groups = "no-real-cluster")
-public class FromRDBMSToKafkaTest extends KafkaConnectorTestCase {
+@Infrastructure(dependencies = {KdcInfrastructureProvider.class, DatabaseInfrastructureProvider.class, KafkaInfrastructureProvider.class, SqoopInfrastructureProvider.class})
+public class FromRDBMSToKafkaTest extends SqoopTestCase {
 
   private static final String[] input = {
     "1,'USA','2004-10-23 00:00:00.000','San Francisco'",
@@ -36,8 +40,8 @@ public class FromRDBMSToKafkaTest extends KafkaConnectorTestCase {
   };
 
   @Test
-  public void testBasic() throws Exception {
-    topic = getTestName();
+  public void testFromRDBMSToKafka() throws Exception {
+    String topic = getTestName();
 
     createAndLoadTableCities();
 
@@ -58,7 +62,7 @@ public class FromRDBMSToKafkaTest extends KafkaConnectorTestCase {
     fillRdbmsFromConfig(job, "id");
 
     // set Kafka  "TO" job config
-    fillKafkaToConfig(job);
+    fillKafkaToConfig(job, topic);
 
     // driver config
     MDriverConfig driverConfig = job.getDriverConfig();
@@ -68,7 +72,7 @@ public class FromRDBMSToKafkaTest extends KafkaConnectorTestCase {
     executeJob(job);
 
     // this will assert the content of the array matches the content of the topic
-    validateContent(input);
+    validateContent(input, topic);
   }
 
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/8d63df71/test/src/test/java/org/apache/sqoop/integration/connector/kite/FromRDBMSToKiteTest.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/sqoop/integration/connector/kite/FromRDBMSToKiteTest.java b/test/src/test/java/org/apache/sqoop/integration/connector/kite/FromRDBMSToKiteTest.java
index be9fef1..10f3614 100644
--- a/test/src/test/java/org/apache/sqoop/integration/connector/kite/FromRDBMSToKiteTest.java
+++ b/test/src/test/java/org/apache/sqoop/integration/connector/kite/FromRDBMSToKiteTest.java
@@ -22,7 +22,12 @@ import org.apache.sqoop.model.MConfigList;
 import org.apache.sqoop.model.MDriverConfig;
 import org.apache.sqoop.model.MJob;
 import org.apache.sqoop.model.MLink;
-import org.apache.sqoop.test.testcases.ConnectorTestCase;
+import org.apache.sqoop.test.infrastructure.Infrastructure;
+import org.apache.sqoop.test.infrastructure.SqoopTestCase;
+import org.apache.sqoop.test.infrastructure.providers.DatabaseInfrastructureProvider;
+import org.apache.sqoop.test.infrastructure.providers.HadoopInfrastructureProvider;
+import org.apache.sqoop.test.infrastructure.providers.KdcInfrastructureProvider;
+import org.apache.sqoop.test.infrastructure.providers.SqoopInfrastructureProvider;
 import org.apache.sqoop.test.utils.HdfsUtils;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
@@ -31,7 +36,8 @@ import org.testng.annotations.Test;
 import java.util.List;
 
 @Test
-public class FromRDBMSToKiteTest extends ConnectorTestCase {
+@Infrastructure(dependencies = {KdcInfrastructureProvider.class, HadoopInfrastructureProvider.class, SqoopInfrastructureProvider.class, DatabaseInfrastructureProvider.class})
+public class FromRDBMSToKiteTest extends SqoopTestCase {
   @BeforeMethod(alwaysRun = true)
   public void createTable() {
     createAndLoadTableCities();
@@ -51,7 +57,7 @@ public class FromRDBMSToKiteTest extends ConnectorTestCase {
    */
   @Override
   public String getMapreduceDirectory() {
-    return HdfsUtils.joinPathFragments(hadoopCluster.getTestDirectory(), getClass().getName(), "namespace", getTestName()).replaceAll("/$", "");
+    return HdfsUtils.joinPathFragments(getInfrastructureProvider(HadoopInfrastructureProvider.class).getInstance().getTestDirectory(), getClass().getName(), "namespace", getTestName()).replaceAll("/$", "");
   }
 
   @Test
@@ -64,7 +70,8 @@ public class FromRDBMSToKiteTest extends ConnectorTestCase {
     // Kite link
     MLink kiteLink = getClient().createLink("kite-connector");
     kiteLink.getConnectorLinkConfig().getStringInput("linkConfig.authority").setValue(hdfsClient.getUri().getAuthority());
-    kiteLink.getConnectorLinkConfig().getStringInput("linkConfig.confDir").setValue(getCluster().getConfigurationPath());
+    kiteLink.getConnectorLinkConfig().getStringInput("linkConfig.confDir").setValue(
+        getInfrastructureProvider(SqoopInfrastructureProvider.class).getInstance().getConfigurationPath());
     saveLink(kiteLink);
 
     // Job creation