You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ab...@apache.org on 2014/10/10 05:07:24 UTC

[32/50] [abbrv] git commit: SQOOP-1545: Sqoop2: From/To: Add supported directions to Repository

SQOOP-1545: Sqoop2: From/To: Add supported directions to Repository

(Abraham Elmahrek via Jarek Jarcec Cecho)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/fc74316f
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/fc74316f
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/fc74316f

Branch: refs/heads/sqoop2
Commit: fc74316fbdcee80272106064ac4e1e747c300b97
Parents: b600036
Author: Jarek Jarcec Cecho <ja...@apache.org>
Authored: Thu Oct 9 08:36:35 2014 -0700
Committer: Abraham Elmahrek <ab...@elmahrek.com>
Committed: Thu Oct 9 18:22:51 2014 -0700

----------------------------------------------------------------------
 .../sqoop/common/SupportedDirections.java       |  79 +++-
 .../sqoop/common/TestSupportedDirections.java   |  19 +
 .../sqoop/repository/derby/DerbyRepoError.java  |   8 +
 .../derby/DerbyRepositoryHandler.java           | 398 ++++++++++++++++---
 .../repository/derby/DerbySchemaConstants.java  |  53 +++
 .../repository/derby/DerbySchemaQuery.java      | 164 +++++++-
 .../sqoop/repository/derby/DerbyTestCase.java   | 131 ++++--
 .../repository/derby/TestConnectorHandling.java |  62 ++-
 .../sqoop/repository/derby/TestJobHandling.java |   4 -
 .../sqoop/shell/ShowConnectorFunction.java      |  30 +-
 .../sqoop/connector/spi/SqoopConnector.java     |   2 +-
 11 files changed, 808 insertions(+), 142 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/fc74316f/common/src/main/java/org/apache/sqoop/common/SupportedDirections.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/common/SupportedDirections.java b/common/src/main/java/org/apache/sqoop/common/SupportedDirections.java
index 25ba276..c527117 100644
--- a/common/src/main/java/org/apache/sqoop/common/SupportedDirections.java
+++ b/common/src/main/java/org/apache/sqoop/common/SupportedDirections.java
@@ -20,7 +20,9 @@ package org.apache.sqoop.common;
 /**
  * Represents which Directions are supported.
  */
-public class SupportedDirections {
+public class SupportedDirections implements Comparable<SupportedDirections> {
+  private static final char SUPPORTED_DIRECTIONS_SEPARATOR = '/';
+
   private boolean from;
   private boolean to;
 
@@ -38,4 +40,79 @@ public class SupportedDirections {
     return direction == Direction.FROM && from
         || direction == Direction.TO && to;
   }
+
+  /**
+   * @return String "FROM", "TO", "FROM/TO", "".
+   */
+  public String toString() {
+    StringBuffer buffer = new StringBuffer();
+
+    if (isDirectionSupported(Direction.FROM)) {
+      buffer.append(Direction.FROM);
+
+      if (isDirectionSupported(Direction.TO)) {
+        buffer.append(SUPPORTED_DIRECTIONS_SEPARATOR);
+        buffer.append(Direction.TO);
+      }
+    } else if (isDirectionSupported(Direction.TO)) {
+      buffer.append(Direction.TO);
+    }
+
+    return buffer.toString();
+  }
+
+  public static SupportedDirections fromString(String supportedDirections) {
+    boolean from = false, to = false;
+
+    if (supportedDirections != null && !supportedDirections.equals("")) {
+      for (String direction : supportedDirections.split("/")) {
+        switch (Direction.valueOf(direction)) {
+          case FROM:
+            from = true;
+            break;
+
+          case TO:
+            to = true;
+            break;
+        }
+      }
+    }
+
+    return new SupportedDirections(from, to);
+  }
+
+  public static SupportedDirections fromDirection(Direction direction) {
+    boolean from = false, to = false;
+    switch (direction) {
+      case FROM:
+        from = true;
+        break;
+
+      case TO:
+        to = true;
+        break;
+    }
+    return new SupportedDirections(from, to);
+  }
+
+  @Override
+  public int compareTo(SupportedDirections o) {
+    int hash = 0;
+    if (this.isDirectionSupported(Direction.FROM)) {
+      hash |= 1;
+    }
+    if (this.isDirectionSupported(Direction.TO)) {
+      hash |= 2;
+    }
+
+    int oHash = 0;
+    if (this.isDirectionSupported(Direction.FROM)) {
+      oHash |= 1;
+    }
+    if (this.isDirectionSupported(Direction.TO)) {
+      oHash |= 2;
+    }
+
+    return hash - oHash;
+  }
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/fc74316f/common/src/test/java/org/apache/sqoop/common/TestSupportedDirections.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/sqoop/common/TestSupportedDirections.java b/common/src/test/java/org/apache/sqoop/common/TestSupportedDirections.java
index 4fbaf82..4f0cdd6 100644
--- a/common/src/test/java/org/apache/sqoop/common/TestSupportedDirections.java
+++ b/common/src/test/java/org/apache/sqoop/common/TestSupportedDirections.java
@@ -52,4 +52,23 @@ public class TestSupportedDirections {
     Assert.assertFalse(
         supportedDirections.isDirectionSupported(Direction.TO));
   }
+
+  @Test
+  public void testToString() {
+    // Both
+    SupportedDirections supportedDirections = new SupportedDirections(true, true);
+    Assert.assertEquals("FROM/TO", supportedDirections.toString());
+
+    // FROM
+    supportedDirections = new SupportedDirections(true, false);
+    Assert.assertEquals("FROM", supportedDirections.toString());
+
+    // TO
+    supportedDirections = new SupportedDirections(false, true);
+    Assert.assertEquals("TO", supportedDirections.toString());
+
+    // NONE
+    supportedDirections = new SupportedDirections(false, false);
+    Assert.assertEquals("", supportedDirections.toString());
+  }
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/fc74316f/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepoError.java
----------------------------------------------------------------------
diff --git a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepoError.java b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepoError.java
index cc31d06..0f0f7c4 100644
--- a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepoError.java
+++ b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepoError.java
@@ -181,6 +181,14 @@ public enum DerbyRepoError implements ErrorCode {
   DERBYREPO_0044("Update of driver config failed"),
 
   DERBYREPO_0045("Can't retrieve all connectors"),
+
+  DERBYREPO_0046("Could not add directions"),
+
+  DERBYREPO_0047("Could not get ID of recently added direction"),
+
+  DERBYREPO_0048("Could not register config direction"),
+
+  DERBYREPO_0049("Could not set connector direction")
             ;
 
   private final String message;

http://git-wip-us.apache.org/repos/asf/sqoop/blob/fc74316f/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java
----------------------------------------------------------------------
diff --git a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java
index 73d8387..10a7b1a 100644
--- a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java
+++ b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java
@@ -33,13 +33,16 @@ import java.util.Date;
 import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
+import java.util.TreeMap;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.log4j.Logger;
 import org.apache.sqoop.common.Direction;
 import org.apache.sqoop.common.DirectionError;
 import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.common.SupportedDirections;
 import org.apache.sqoop.connector.ConnectorHandler;
 import org.apache.sqoop.connector.ConnectorManagerUtils;
 import org.apache.sqoop.model.MBooleanInput;
@@ -88,6 +91,8 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
    */
   private static final String CONNECTOR_HDFS = "hdfs-connector";
 
+  private static final String LINK_HDFS = "hdfs-link";
+
   private JdbcRepositoryContext repoContext;
 
   /**
@@ -121,7 +126,7 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
 
       // Register the job config type, since driver config is per job
       registerConfigs(null, null, mDriver.getDriverConfig().getConfigs(),
-        MConfigType.JOB.name(), baseConfigStmt, baseInputStmt);
+        MConfigType.JOB.name(), baseConfigStmt, baseInputStmt, conn);
 
     } catch (SQLException ex) {
       throw new SqoopException(DerbyRepoError.DERBYREPO_0014, mDriver.toString(), ex);
@@ -150,14 +155,17 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
 
       // Register link type config
       registerConfigs(connectorId, null, mc.getLinkConfig().getConfigs(),
-        MConfigType.LINK.name(), baseConfigStmt, baseInputStmt);
+        MConfigType.LINK.name(), baseConfigStmt, baseInputStmt, conn);
 
       // Register both from/to job type config
-      registerConfigs(connectorId, Direction.FROM, mc.getConfig(Direction.FROM).getConfigs(),
-        MConfigType.JOB.name(), baseConfigStmt, baseInputStmt);
-      registerConfigs(connectorId, Direction.TO, mc.getConfig(Direction.TO).getConfigs(),
-        MConfigType.JOB.name(), baseConfigStmt, baseInputStmt);
-
+      if (mc.getSupportedDirections().isDirectionSupported(Direction.FROM)) {
+        registerConfigs(connectorId, Direction.FROM, mc.getConfig(Direction.FROM).getConfigs(),
+            MConfigType.JOB.name(), baseConfigStmt, baseInputStmt, conn);
+      }
+      if (mc.getSupportedDirections().isDirectionSupported(Direction.TO)) {
+        registerConfigs(connectorId, Direction.TO, mc.getConfig(Direction.TO).getConfigs(),
+            MConfigType.JOB.name(), baseConfigStmt, baseInputStmt, conn);
+      }
     } catch (SQLException ex) {
       throw new SqoopException(DerbyRepoError.DERBYREPO_0014,
         mc.toString(), ex);
@@ -167,6 +175,34 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
 
   }
 
+  private void insertConnectorDirection(Long connectorId, Direction direction, Connection conn)
+      throws SQLException {
+    PreparedStatement stmt = null;
+
+    try {
+      stmt = conn.prepareStatement(STMT_INSERT_SQ_CONNECTOR_DIRECTIONS);
+      stmt.setLong(1, connectorId);
+      stmt.setLong(2, getDirection(direction, conn));
+
+      if (stmt.executeUpdate() != 1) {
+        throw new SqoopException(DerbyRepoError.DERBYREPO_0049);
+      }
+    } finally {
+      closeStatements(stmt);
+    }
+  }
+
+  private void insertConnectorDirections(Long connectorId, SupportedDirections directions, Connection conn)
+      throws SQLException {
+    if (directions.isDirectionSupported(Direction.FROM)) {
+      insertConnectorDirection(connectorId, Direction.FROM, conn);
+    }
+
+    if (directions.isDirectionSupported(Direction.TO)) {
+      insertConnectorDirection(connectorId, Direction.TO, conn);
+    }
+  }
+
   private long getConnectorId(MConnector mc, Connection conn) {
     PreparedStatement baseConnectorStmt = null;
     try {
@@ -187,6 +223,10 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
       if (!rsetConnectorId.next()) {
         throw new SqoopException(DerbyRepoError.DERBYREPO_0013);
       }
+
+      insertConnectorDirections(rsetConnectorId.getLong(1),
+          mc.getSupportedDirections(), conn);
+
       return rsetConnectorId.getLong(1);
     } catch (SQLException ex) {
       throw new SqoopException(DerbyRepoError.DERBYREPO_0014,
@@ -399,6 +439,7 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
     }
     if(version <= 3) {
       // Schema modifications
+      runQuery(QUERY_CREATE_TABLE_SQ_DIRECTION, conn);
       runQuery(QUERY_UPGRADE_TABLE_SQ_CONFIG_RENAME_COLUMN_SQ_CFG_OPERATION_TO_SQ_CFG_DIRECTION, conn);
       runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_RENAME_COLUMN_SQB_LINK_TO_SQB_FROM_LINK, conn);
       runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_ADD_COLUMN_SQB_TO_LINK, conn);
@@ -412,6 +453,9 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
         updteJobInternals(conn, registerHdfsConnector(conn));
       }
 
+      // Change direction from VARCHAR to BIGINT + foreign key.
+      updateDirections(conn, insertDirections(conn));
+
       // Wait to remove SQB_TYPE (IMPORT/EXPORT) until we update data.
       // Data updates depend on knowledge of the type of job.
       runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_REMOVE_COLUMN_SQB_TYPE, conn);
@@ -443,6 +487,110 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
   }
 
   /**
+   * Insert directions: FROM and TO.
+   * @param conn
+   * @return Map<Direction, Long> direction ID => Direction
+   */
+  protected Map<Direction, Long> insertDirections(Connection conn) {
+    // Add directions
+    Map<Direction, Long> directionMap = new TreeMap<Direction, Long>();
+    PreparedStatement insertDirectionStmt = null;
+    try {
+      // Insert directions and get IDs.
+      for (Direction direction : Direction.values()) {
+        insertDirectionStmt = conn.prepareStatement(STMT_INSERT_DIRECTION, Statement.RETURN_GENERATED_KEYS);
+        insertDirectionStmt.setString(1, direction.toString());
+        if (insertDirectionStmt.executeUpdate() != 1) {
+          throw new SqoopException(DerbyRepoError.DERBYREPO_0046, "Could not add directions FROM and TO.");
+        }
+
+        ResultSet directionId = insertDirectionStmt.getGeneratedKeys();
+        if (directionId.next()) {
+          if (LOG.isInfoEnabled()) {
+            LOG.info("Loaded direction: " + directionId.getLong(1));
+          }
+
+          directionMap.put(direction, directionId.getLong(1));
+        } else {
+          throw new SqoopException(DerbyRepoError.DERBYREPO_0047, "Could not get ID of direction " + direction);
+        }
+      }
+    } catch (SQLException e) {
+      throw new SqoopException(DerbyRepoError.DERBYREPO_0000, e);
+    } finally {
+      closeStatements(insertDirectionStmt);
+    }
+
+    return directionMap;
+  }
+
+  /**
+   * Add normalized M2M for SQ_CONNECTOR and SQ_CONFIG for Direction.
+   * 1. Remember all ID => direction for configs.
+   * 2. Drop SQF_DIRECTION (varhchar).
+   * 3. Add new M2M tables for SQ_CONNECTOR and SQ_CONFIG.
+   * 4. Add directions via updating SQ_CONFIG with proper Direction IDs.
+   * 5. Make sure all connectors have all supported directions.
+   * @param conn
+   */
+  protected void updateDirections(Connection conn, Map<Direction, Long> directionMap) {
+    // Remember directions
+    Statement fetchFormsStmt = null,
+              fetchConnectorsStmt = null;
+    List<Long> connectorIds = new LinkedList<Long>();
+    List<Long> configIds = new LinkedList<Long>();
+    List<String> directions = new LinkedList<String>();
+    try {
+      fetchFormsStmt = conn.createStatement();
+      ResultSet rs = fetchFormsStmt.executeQuery(STMT_FETCH_CONFIG_DIRECTIONS);
+      while (rs.next()) {
+        configIds.add(rs.getLong(1));
+        directions.add(rs.getString(2));
+      }
+      rs.close();
+    } catch (SQLException e) {
+      throw new SqoopException(DerbyRepoError.DERBYREPO_0000, e);
+    } finally {
+      closeStatements(fetchFormsStmt);
+    }
+
+    // Change Schema
+    runQuery(QUERY_UPGRADE_TABLE_SQ_CONFIG_DROP_COLUMN_SQ_CFG_DIRECTION_VARCHAR, conn);
+    runQuery(QUERY_CREATE_TABLE_SQ_CONNECTOR_DIRECTIONS, conn);
+    runQuery(QUERY_CREATE_TABLE_SQ_CONFIG_DIRECTIONS, conn);
+
+    // Add directions back
+    while (!configIds.isEmpty() && !directions.isEmpty()) {
+      Long configId = configIds.remove(0);
+      String directionString = directions.remove(0);
+      if (directionString != null && !directionString.isEmpty()) {
+        Direction direction = Direction.valueOf(directionString);
+        runQuery(STMT_INSERT_SQ_CONFIG_DIRECTIONS, conn, configId, directionMap.get(direction));
+      }
+    }
+
+    // Add connector directions
+    try {
+      fetchConnectorsStmt = conn.createStatement();
+      ResultSet rs = fetchConnectorsStmt.executeQuery(STMT_SELECT_CONNECTOR_ALL);
+      while (rs.next()) {
+        connectorIds.add(rs.getLong(1));
+      }
+      rs.close();
+    } catch (SQLException e) {
+      throw new SqoopException(DerbyRepoError.DERBYREPO_0000, e);
+    } finally {
+      closeStatements(fetchConnectorsStmt);
+    }
+
+    for (Long connectorId : connectorIds) {
+      for (Long directionId : directionMap.values()) {
+        runQuery(STMT_INSERT_SQ_CONNECTOR_DIRECTIONS, conn, connectorId, directionId);
+      }
+    }
+  }
+
+  /**
    * Upgrade job data from IMPORT/EXPORT to FROM/TO.
    * Since the framework is no longer responsible for HDFS,
    * the HDFS connector/link must be added.
@@ -509,13 +657,13 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
     runQuery(QUERY_UPGRADE_TABLE_SQ_CONFIG_UPDATE_DRIVER_INDEX, conn,
         new Long(0), "throttling");
 
-    MLink hdfsLink = createHdfsLink(conn);
+    Long linkId = createHdfsLink(conn, connectorId);
     runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_UPDATE_SQB_TO_LINK_COPY_SQB_FROM_LINK, conn,
         "EXPORT");
     runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_UPDATE_SQB_FROM_LINK, conn,
-        new Long(hdfsLink.getPersistenceId()), "EXPORT");
+        new Long(linkId), "EXPORT");
     runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_UPDATE_SQB_TO_LINK, conn,
-        new Long(hdfsLink.getPersistenceId()), "IMPORT");
+        new Long(linkId), "IMPORT");
 
     runQuery(QUERY_UPGRADE_TABLE_SQ_CONFIG_UPDATE_SQ_CFG_NAME, conn,
         "fromJobConfig", "table", Direction.FROM.toString());
@@ -556,7 +704,7 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
       if (handler.getUniqueName().equals(CONNECTOR_HDFS)) {
         try {
           PreparedStatement baseConnectorStmt = conn.prepareStatement(
-              STMT_INSERT_CONNECTOR_BASE,
+              STMT_INSERT_CONNECTOR_WITHOUT_SUPPORTED_DIRECTIONS,
               Statement.RETURN_GENERATED_KEYS);
           baseConnectorStmt.setString(1, handler.getMetadata().getUniqueName());
           baseConnectorStmt.setString(2, handler.getMetadata().getClassName());
@@ -588,22 +736,46 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
    *
    * NOTE: Upgrade path only!
    */
-  private MLink createHdfsLink(Connection conn) {
+  private Long createHdfsLink(Connection conn, Long connectorId) {
     if (LOG.isTraceEnabled()) {
       LOG.trace("Creating HDFS link.");
     }
 
-    MConnector hdfsConnector = this.findConnector(CONNECTOR_HDFS, conn);
-    MLink hdfsLink = new MLink(
-        hdfsConnector.getPersistenceId(),
-        hdfsConnector.getLinkConfig());
-    this.createLink(hdfsLink, conn);
+    PreparedStatement stmt = null;
+    int result;
+    try {
+      stmt = conn.prepareStatement(STMT_INSERT_LINK,
+          Statement.RETURN_GENERATED_KEYS);
+      stmt.setString(1, LINK_HDFS);
+      stmt.setLong(2, connectorId);
+      stmt.setBoolean(3, true);
+      stmt.setNull(4, Types.VARCHAR);
+      stmt.setTimestamp(5, new Timestamp(System.currentTimeMillis()));
+      stmt.setNull(6, Types.VARCHAR);
+      stmt.setTimestamp(7, new Timestamp(System.currentTimeMillis()));
+
+      result = stmt.executeUpdate();
+      if (result != 1) {
+        throw new SqoopException(DerbyRepoError.DERBYREPO_0012,
+            Integer.toString(result));
+      }
 
-    if (LOG.isTraceEnabled()) {
-      LOG.trace("Created HDFS link.");
-    }
+      ResultSet rsetConnectionId = stmt.getGeneratedKeys();
 
-    return hdfsLink;
+      if (!rsetConnectionId.next()) {
+        throw new SqoopException(DerbyRepoError.DERBYREPO_0013);
+      }
+
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Created HDFS link.");
+      }
+
+      return rsetConnectionId.getLong(1);
+    } catch (SQLException ex) {
+      throw new SqoopException(DerbyRepoError.DERBYREPO_0019, ex);
+    } finally {
+      closeStatements(stmt);
+    }
   }
 
   /**
@@ -695,7 +867,7 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
 
       // Register a driver config as a job type with no owner/connector and direction
       registerConfigs(null/* owner*/, null /*direction*/, mDriver.getDriverConfig().getConfigs(),
-        MConfigType.JOB.name(), baseConfigStmt, baseInputStmt);
+        MConfigType.JOB.name(), baseConfigStmt, baseInputStmt, conn);
 
       // We're using hardcoded value for driver config as they are
       // represented as NULL in the database.
@@ -1744,13 +1916,13 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
 
       Counters counters = new Counters();
 
-      while(rs.next()) {
+      while (rs.next()) {
         String groupName = rs.getString(1);
         String counterName = rs.getString(2);
         long value = rs.getLong(3);
 
         CounterGroup group = counters.getCounterGroup(groupName);
-        if(group == null) {
+        if (group == null) {
           group = new CounterGroup(groupName);
           counters.addCounterGroup(group);
         }
@@ -1758,7 +1930,7 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
         group.addCounter(new Counter(counterName, value));
       }
 
-      if(counters.isEmpty()) {
+      if (counters.isEmpty()) {
         return null;
       } else {
         return counters;
@@ -1769,7 +1941,83 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
     }
   }
 
-  private List<MConnector> loadConnectors(PreparedStatement stmt,Connection conn) throws SQLException {
+  private Long getDirection(Direction direction, Connection conn) throws SQLException {
+    PreparedStatement directionStmt = null;
+    ResultSet rs = null;
+
+    try {
+      directionStmt = conn.prepareStatement(STMT_SELECT_SQD_ID_BY_SQD_NAME);
+      directionStmt.setString(1, direction.toString());
+      rs = directionStmt.executeQuery();
+
+      rs.next();
+      return rs.getLong(1);
+    } finally {
+      if (rs != null) {
+        closeResultSets(rs);
+      }
+      if (directionStmt != null) {
+        closeStatements(directionStmt);
+      }
+    }
+  }
+
+  private Direction getDirection(long directionId, Connection conn) throws SQLException {
+    PreparedStatement directionStmt = null;
+    ResultSet rs = null;
+
+    try {
+      directionStmt = conn.prepareStatement(STMT_SELECT_SQD_NAME_BY_SQD_ID);
+      directionStmt.setLong(1, directionId);
+      rs = directionStmt.executeQuery();
+
+      rs.next();
+      return Direction.valueOf(rs.getString(1));
+    } finally {
+      if (rs != null) {
+        closeResultSets(rs);
+      }
+      if (directionStmt != null) {
+        closeStatements(directionStmt);
+      }
+    }
+  }
+
+  private SupportedDirections findConnectorSupportedDirections(long connectorId, Connection conn) throws SQLException {
+    PreparedStatement connectorDirectionsStmt = null;
+    ResultSet rs = null;
+
+    boolean from = false, to = false;
+
+    try {
+      connectorDirectionsStmt = conn.prepareStatement(STMT_SELECT_SQ_CONNECTOR_DIRECTIONS);
+      connectorDirectionsStmt.setLong(1, connectorId);
+      rs = connectorDirectionsStmt.executeQuery();
+
+      while(rs.next()) {
+        switch(getDirection(rs.getLong(2), conn)) {
+          case FROM:
+            from = true;
+            break;
+
+          case TO:
+            to = true;
+            break;
+        }
+      }
+    } finally {
+      if (rs != null) {
+        closeResultSets(rs);
+      }
+      if (connectorDirectionsStmt != null) {
+        closeStatements(connectorDirectionsStmt);
+      }
+    }
+
+    return new SupportedDirections(from, to);
+  }
+
+  private List<MConnector> loadConnectors(PreparedStatement stmt, Connection conn) throws SQLException {
     List<MConnector> connectors = new ArrayList<MConnector>();
     ResultSet rsConnectors = null;
     PreparedStatement connectorConfigFetchStmt = null;
@@ -1792,13 +2040,21 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
         List<MConfig> fromConfig = new ArrayList<MConfig>();
         List<MConfig> toConfig = new ArrayList<MConfig>();
 
-        loadConfigTypes(linkConfig, fromConfig, toConfig,
-            connectorConfigFetchStmt, connectorConfigInputFetchStmt, 1);
+        loadConfigTypes(linkConfig, fromConfig, toConfig, connectorConfigFetchStmt,
+            connectorConfigInputFetchStmt, 1, conn);
 
+        SupportedDirections supportedDirections
+            = findConnectorSupportedDirections(connectorId, conn);
+        MFromConfig fromJobConfig = null;
+        MToConfig toJobConfig = null;
+        if (supportedDirections.isDirectionSupported(Direction.FROM)) {
+          fromJobConfig = new MFromConfig(fromConfig);
+        }
+        if (supportedDirections.isDirectionSupported(Direction.TO)) {
+          toJobConfig = new MToConfig(toConfig);
+        }
         MConnector mc = new MConnector(connectorName, connectorClassName, connectorVersion,
-                                       new MLinkConfig(linkConfig),
-                                       new MFromConfig(fromConfig),
-                                       new MToConfig(toConfig));
+                                       new MLinkConfig(linkConfig), fromJobConfig, toJobConfig);
         mc.setPersistenceId(connectorId);
 
         connectors.add(mc);
@@ -1845,7 +2101,7 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
         List<MConfig> toConfig = new ArrayList<MConfig>();
 
         loadConfigTypes(connectorLinkConfig, fromConfig, toConfig, connectorConfigFetchStatement,
-            connectorConfigInputStatement, 2);
+            connectorConfigInputStatement, 2, conn);
         MLink link = new MLink(connectorId, new MLinkConfig(connectorLinkConfig));
 
         link.setPersistenceId(id);
@@ -1911,7 +2167,7 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
         List<MConfig> fromConnectorToJobConfig = new ArrayList<MConfig>();
 
         loadConfigTypes(fromConnectorLinkConfig, fromConnectorFromJobConfig, fromConnectorToJobConfig,
-            fromConfigFetchStmt, jobInputFetchStmt, 2);
+            fromConfigFetchStmt, jobInputFetchStmt, 2, conn);
 
         // TO entity configs
         List<MConfig> toConnectorLinkConfig = new ArrayList<MConfig>();
@@ -1922,7 +2178,7 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
         List<MConfig> driverConfig = new ArrayList<MConfig>();
 
         loadConfigTypes(toConnectorLinkConfig, toConnectorFromJobConfig, toConnectorToJobConfig,
-            toConfigFetchStmt, jobInputFetchStmt, 2);
+            toConfigFetchStmt, jobInputFetchStmt, 2, conn);
 
         loadDriverConfigs(driverConfig, driverConfigfetchStmt, jobInputFetchStmt, 2);
 
@@ -1951,6 +2207,21 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
     return jobs;
   }
 
+  private void registerConfigDirection(Long configId, Direction direction, Connection conn)
+      throws SQLException {
+    PreparedStatement stmt = null;
+    try {
+      stmt = conn.prepareStatement(STMT_INSERT_SQ_CONFIG_DIRECTIONS);
+      stmt.setLong(1, configId);
+      stmt.setLong(2, getDirection(direction, conn));
+      if (stmt.executeUpdate() != 1) {
+        throw new SqoopException(DerbyRepoError.DERBYREPO_0048);
+      }
+    } finally {
+      closeStatements(stmt);
+    }
+  }
+
   /**
    * Register configs in derby database. This method will insert the ids
    * generated by the repository into the configs passed in itself.
@@ -1962,12 +2233,13 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
    * @param type
    * @param baseConfigStmt
    * @param baseInputStmt
+   * @param conn
    * @return short number of configs registered.
    * @throws SQLException
    */
   private short registerConfigs(Long connectorId, Direction direction,
       List<MConfig> configs, String type, PreparedStatement baseConfigStmt,
-      PreparedStatement baseInputStmt)
+      PreparedStatement baseInputStmt, Connection conn)
           throws SQLException {
     short configIndex = 0;
 
@@ -1977,14 +2249,10 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
       } else {
         baseConfigStmt.setLong(1, connectorId);
       }
-      if(direction == null) {
-        baseConfigStmt.setNull(2, Types.VARCHAR);
-      } else {
-        baseConfigStmt.setString(2, direction.name());
-      }
-      baseConfigStmt.setString(3, config.getName());
-      baseConfigStmt.setString(4, type);
-      baseConfigStmt.setShort(5, configIndex++);
+
+      baseConfigStmt.setString(2, config.getName());
+      baseConfigStmt.setString(3, type);
+      baseConfigStmt.setShort(4, configIndex++);
 
       int baseConfigCount = baseConfigStmt.executeUpdate();
       if (baseConfigCount != 1) {
@@ -1999,6 +2267,10 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
       long configId = rsetConfigId.getLong(1);
       config.setPersistenceId(configId);
 
+      if (direction != null) {
+        registerConfigDirection(configId, direction, conn);
+      }
+
       // Insert all the inputs
       List<MInput<?>> inputs = config.getInputs();
       registerConfigInputs(configId, inputs, baseInputStmt);
@@ -2071,7 +2343,7 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
         } else if (args[i] instanceof Long) {
           stmt.setLong(i + 1, (Long) args[i]);
         } else {
-          stmt.setObject(i, args[i]);
+          stmt.setObject(i + 1, args[i]);
         }
       }
 
@@ -2115,9 +2387,9 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
     while (rsetConfig.next()) {
       long configId = rsetConfig.getLong(1);
       Long fromConnectorId = rsetConfig.getLong(2);
-      String configName = rsetConfig.getString(4);
-      String configTYpe = rsetConfig.getString(5);
-      int configIndex = rsetConfig.getInt(6);
+      String configName = rsetConfig.getString(3);
+      String configTYpe = rsetConfig.getString(4);
+      int configIndex = rsetConfig.getInt(5);
       List<MInput<?>> configInputs = new ArrayList<MInput<?>>();
 
       MConfig mDriverConfig = new MConfig(configName, configInputs);
@@ -2211,6 +2483,26 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
     }
   }
 
+  private Direction findConfigDirection(long configId, Connection conn) throws SQLException {
+    PreparedStatement stmt = null;
+    ResultSet rs = null;
+
+    try {
+      stmt = conn.prepareStatement(STMT_SELECT_SQ_CONFIG_DIRECTIONS);
+      stmt.setLong(1, configId);
+      rs = stmt.executeQuery();
+      rs.next();
+      return getDirection(rs.getLong(2), conn);
+    } finally {
+      if (rs != null) {
+        closeResultSets(rs);
+      }
+      if (stmt != null) {
+        closeStatements(stmt);
+      }
+    }
+  }
+
   /**
    * Load configs and corresponding inputs from Derby database.
    *
@@ -2222,21 +2514,21 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
    * @param toConfig TO job configs that will be filled up
    * @param configFetchStmt Prepared statement for fetching configs
    * @param inputFetchStmt Prepare statement for fetching inputs
+   * @param conn Connection object that is used to find config direction.
    * @throws SQLException In case of any failure on Derby side
    */
   public void loadConfigTypes(List<MConfig> linkConfig, List<MConfig> fromConfig,
       List<MConfig> toConfig, PreparedStatement configFetchStmt, PreparedStatement inputFetchStmt,
-      int configPosition) throws SQLException {
+      int configPosition, Connection conn) throws SQLException {
 
     // Get list of structures from database
     ResultSet rsetConfig = configFetchStmt.executeQuery();
     while (rsetConfig.next()) {
       long configId = rsetConfig.getLong(1);
       Long configConnectorId = rsetConfig.getLong(2);
-      String operation = rsetConfig.getString(3);
-      String configName = rsetConfig.getString(4);
-      String configType = rsetConfig.getString(5);
-      int configIndex = rsetConfig.getInt(6);
+      String configName = rsetConfig.getString(3);
+      String configType = rsetConfig.getString(4);
+      int configIndex = rsetConfig.getInt(5);
       List<MInput<?>> configInputs = new ArrayList<MInput<?>>();
 
       MConfig config = new MConfig(configName, configInputs);
@@ -2324,7 +2616,7 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
           linkConfig.add(config);
           break;
         case JOB:
-          Direction type = Direction.valueOf(operation);
+          Direction type = findConfigDirection(configId, conn);
           List<MConfig> jobConfigs;
           switch(type) {
             case FROM:

http://git-wip-us.apache.org/repos/asf/sqoop/blob/fc74316f/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaConstants.java
----------------------------------------------------------------------
diff --git a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaConstants.java b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaConstants.java
index ad749ed..cf6e657 100644
--- a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaConstants.java
+++ b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaConstants.java
@@ -41,6 +41,17 @@ public final class DerbySchemaConstants {
 
   public static final String COLUMN_SQM_VALUE = "SQM_VALUE";
 
+  // SQ_DIRECTION
+
+  public static final String TABLE_SQ_DIRECTION_NAME = "SQ_DIRECTION";
+
+  public static final String TABLE_SQ_DIRECTION = SCHEMA_PREFIX
+      + TABLE_SQ_DIRECTION_NAME;
+
+  public static final String COLUMN_SQD_ID = "SQD_ID";
+
+  public static final String COLUMN_SQD_NAME = "SQD_NAME";
+
   // SQ_CONNECTOR
 
   public static final String TABLE_SQ_CONNECTOR_NAME = "SQ_CONNECTOR";
@@ -56,6 +67,27 @@ public final class DerbySchemaConstants {
 
   public static final String COLUMN_SQC_VERSION = "SQC_VERSION";
 
+  // SQ_CONNECTOR_DIRECTIONS
+
+  public static final String TABLE_SQ_CONNECTOR_DIRECTIONS_NAME = "SQ_CONNECTOR_DIRECTIONS";
+
+  public static final String TABLE_SQ_CONNECTOR_DIRECTIONS = SCHEMA_PREFIX
+      + TABLE_SQ_CONNECTOR_DIRECTIONS_NAME;
+
+  public static final String COLUMN_SQCD_ID = "SQCD_ID";
+
+  public static final String COLUMN_SQCD_CONNECTOR = "SQCD_CONNECTOR";
+
+  public static final String COLUMN_SQCD_DIRECTION = "SQCD_DIRECTION";
+
+  public static final String CONSTRAINT_SQCD_SQC_NAME = CONSTRAINT_PREFIX + "SQCD_SQC";
+
+  public static final String CONSTRAINT_SQCD_SQC = SCHEMA_PREFIX + CONSTRAINT_SQCD_SQC_NAME;
+
+  public static final String CONSTRAINT_SQCD_SQD_NAME = CONSTRAINT_PREFIX + "SQCD_SQD";
+
+  public static final String CONSTRAINT_SQCD_SQD = SCHEMA_PREFIX + CONSTRAINT_SQCD_SQD_NAME;
+
   // SQ_CONFIG
 
   public static final String TABLE_SQ_CONFIG_NAME = "SQ_CONFIG";
@@ -81,6 +113,27 @@ public final class DerbySchemaConstants {
 
   public static final String CONSTRAINT_SQ_CFG_SQC = SCHEMA_PREFIX + CONSTRAINT_SQ_CFG_SQC_NAME;
 
+  // SQ_CONFIG_DIRECTIONS
+
+  public static final String TABLE_SQ_CONFIG_DIRECTIONS_NAME = "SQ_CONFIG_DIRECTIONS";
+
+  public static final String TABLE_SQ_CONFIG_DIRECTIONS = SCHEMA_PREFIX
+      + TABLE_SQ_CONFIG_DIRECTIONS_NAME;
+
+  public static final String COLUMN_SQ_CFG_DIR_ID = "SQ_CFG_DIR_ID";
+
+  public static final String COLUMN_SQ_CFG_DIR_CONFIG = "SQ_CFG_DIR_CONFIG";
+
+  public static final String COLUMN_SQ_CFG_DIR_DIRECTION = "SQ_CFG_DIR_DIRECTION";
+
+  public static final String CONSTRAINT_SQ_CFG_DIR_CONFIG_NAME = CONSTRAINT_PREFIX + "SQ_CFG_DIR_CONFIG";
+
+  public static final String CONSTRAINT_SQ_CFG_DIR_CONFIG = SCHEMA_PREFIX + CONSTRAINT_SQ_CFG_DIR_CONFIG_NAME;
+
+  public static final String CONSTRAINT_SQ_CFG_DIR_DIRECTION_NAME = CONSTRAINT_PREFIX + "SQ_CFG_DIR_DIRECTION";
+
+  public static final String CONSTRAINT_SQ_CFG_DIR_DIRECTION = SCHEMA_PREFIX + CONSTRAINT_SQ_CFG_DIR_DIRECTION_NAME;
+
   // SQ_INPUT
 
   public static final String TABLE_SQ_INPUT_NAME = "SQ_INPUT";

http://git-wip-us.apache.org/repos/asf/sqoop/blob/fc74316f/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaQuery.java
----------------------------------------------------------------------
diff --git a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaQuery.java b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaQuery.java
index 478afe2..56ea147 100644
--- a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaQuery.java
+++ b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaQuery.java
@@ -35,31 +35,65 @@ import static org.apache.sqoop.repository.derby.DerbySchemaConstants.*;
  * </pre>
  * </p>
  * <p>
+ * <strong>SQ_DIRECTION</strong>: Directions.
+ * <pre>
+ *    +---------------------------------------+
+ *    | SQ_DIRECTION                          |
+ *    +---------------------------------------+
+ *    | SQD_ID: BIGINT PK AUTO-GEN            |
+ *    | SQD_NAME: VARCHAR(64)                 | "FROM"|"TO"
+ *    +---------------------------------------+
+ * </pre>
+ * </p>
+ * <p>
  * <strong>SQ_CONNECTOR</strong>: Connector registration.
  * <pre>
- *    +----------------------------+
- *    | SQ_CONNECTOR               |
- *    +----------------------------+
- *    | SQC_ID: BIGINT PK AUTO-GEN |
- *    | SQC_NAME: VARCHAR(64)      |
- *    | SQC_CLASS: VARCHAR(255)    |
- *    | SQC_VERSION: VARCHAR(64)   |
- *    +----------------------------+
+ *    +-----------------------------+
+ *    | SQ_CONNECTOR                |
+ *    +-----------------------------+
+ *    | SQC_ID: BIGINT PK AUTO-GEN  |
+ *    | SQC_NAME: VARCHAR(64)       |
+ *    | SQC_CLASS: VARCHAR(255)     |
+ *    | SQC_VERSION: VARCHAR(64)    |
+ *    +-----------------------------+
+ * </pre>
+ * </p>
+ * <p>
+ * <strong>SQ_CONNECTOR_DIRECTIONS</strong>: Connector directions.
+ * <pre>
+ *    +------------------------------+
+ *    | SQ_CONNECTOR_DIRECTIONS      |
+ *    +------------------------------+
+ *    | SQCD_ID: BIGINT PK AUTO-GEN  |
+ *    | SQCD_CONNECTOR: BIGINT       | FK SQCD_CONNECTOR(SQC_ID)
+ *    | SQCD_DIRECTION: BIGINT       | FK SQCD_DIRECTION(SQD_ID)
+ *    +------------------------------+
  * </pre>
  * </p>
  * <p>
  * <strong>SQ_CONFIG</strong>: Config details.
  * <pre>
- *    +----------------------------------+
- *    | SQ_CONFIG                          |
- *    +----------------------------------+
+ *    +-------------------------------------+
+ *    | SQ_CONFIG                           |
+ *    +-------------------------------------+
  *    | SQ_CFG_ID: BIGINT PK AUTO-GEN       |
  *    | SQ_CFG_OWNER: BIGINT                | FK SQ_CFG_OWNER(SQC_ID),NULL for driver
- *    | SQ_CFG_DIRECTION: VARCHAR(32)       | "FROM"|"TO"|NULL
  *    | SQ_CFG_NAME: VARCHAR(64)            |
  *    | SQ_CFG_TYPE: VARCHAR(32)            | "LINK"|"JOB"
  *    | SQ_CFG_INDEX: SMALLINT              |
- *    +----------------------------------+
+ *    +-------------------------------------+
+ * </pre>
+ * </p>
+ * <p>
+ * <strong>SQ_CONFIG_DIRECTIONS</strong>: Connector directions.
+ * <pre>
+ *    +------------------------------+
+ *    | SQ_CONNECTOR_DIRECTIONS      |
+ *    +------------------------------+
+ *    | SQCD_ID: BIGINT PK AUTO-GEN  |
+ *    | SQCD_CONFIG: BIGINT          | FK SQCD_CONFIG(SQ_CFG_ID)
+ *    | SQCD_DIRECTION: BIGINT       | FK SQCD_DIRECTION(SQD_ID)
+ *    +------------------------------+
  * </pre>
  * </p>
  * <p>
@@ -118,11 +152,11 @@ import static org.apache.sqoop.repository.derby.DerbySchemaConstants.*;
  * <strong>SQ_LINK_INPUT</strong>: N:M relationship link and input
  * <pre>
  *    +----------------------------+
- *    | SQ_LINK_INPUT        |
+ *    | SQ_LINK_INPUT              |
  *    +----------------------------+
- *    | SQ_LNKI_LINK: BIGINT PK | FK SQ_LINK(SQ_LNK_ID)
- *    | SQ_LNKI_INPUT: BIGINT PK      | FK SQ_INPUT(SQI_ID)
- *    | SQ_LNKI_VALUE: LONG VARCHAR   |
+ *    | SQ_LNKI_LINK: BIGINT PK    | FK SQ_LINK(SQ_LNK_ID)
+ *    | SQ_LNKI_INPUT: BIGINT PK   | FK SQ_INPUT(SQI_ID)
+ *    | SQ_LNKI_VALUE: LONG VARCHAR|
  *    +----------------------------+
  * </pre>
  * </p>
@@ -212,6 +246,13 @@ public final class DerbySchemaQuery {
     + COLUMN_SQM_VALUE + " VARCHAR(64) "
     + ")";
 
+  // DDL: Create table SQ_DIRECTION
+  public static final String QUERY_CREATE_TABLE_SQ_DIRECTION =
+      "CREATE TABLE " + TABLE_SQ_DIRECTION + " ("
+      + COLUMN_SQD_ID + " BIGINT GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1) PRIMARY KEY, "
+      + COLUMN_SQD_NAME + " VARCHAR(64)"
+      + ")";
+
   // DDL: Create table SQ_CONNECTOR
   public static final String QUERY_CREATE_TABLE_SQ_CONNECTOR =
       "CREATE TABLE " + TABLE_SQ_CONNECTOR + " ("
@@ -221,6 +262,20 @@ public final class DerbySchemaQuery {
       + COLUMN_SQC_VERSION + " VARCHAR(64) "
       + ")";
 
+  // DDL: Create table SQ_CONNECTOR_DIRECTIONS
+  public static final String QUERY_CREATE_TABLE_SQ_CONNECTOR_DIRECTIONS =
+      "CREATE TABLE " + TABLE_SQ_CONNECTOR_DIRECTIONS + " ("
+      + COLUMN_SQCD_ID + " BIGINT GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1) PRIMARY KEY, "
+      + COLUMN_SQCD_CONNECTOR + " BIGINT, "
+      + COLUMN_SQCD_DIRECTION + " BIGINT, "
+      + "CONSTRAINT " + CONSTRAINT_SQCD_SQC + " "
+        + "FOREIGN KEY (" + COLUMN_SQCD_CONNECTOR + ") "
+          + "REFERENCES " + TABLE_SQ_CONNECTOR + " (" + COLUMN_SQC_ID + "), "
+      + "CONSTRAINT " + CONSTRAINT_SQCD_SQD + " "
+        + "FOREIGN KEY (" + COLUMN_SQCD_DIRECTION + ") "
+          + "REFERENCES " + TABLE_SQ_DIRECTION + " (" + COLUMN_SQD_ID + ")"
+      + ")";
+
   // DDL: Create table SQ_CONFIG ( It stores the configs defined by every connector), if connector is null then it is driver config
   public static final String QUERY_CREATE_TABLE_SQ_CONFIG =
       "CREATE TABLE " + TABLE_SQ_CONFIG + " ("
@@ -235,6 +290,20 @@ public final class DerbySchemaQuery {
           + "REFERENCES " + TABLE_SQ_CONNECTOR + " (" + COLUMN_SQC_ID + ")"
       + ")";
 
+  // DDL: Create table SQ_CONFIG_DIRECTIONS
+  public static final String QUERY_CREATE_TABLE_SQ_CONFIG_DIRECTIONS =
+      "CREATE TABLE " + TABLE_SQ_CONFIG_DIRECTIONS + " ("
+      + COLUMN_SQ_CFG_DIR_ID + " BIGINT GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1) PRIMARY KEY, "
+      + COLUMN_SQ_CFG_DIR_CONFIG + " BIGINT, "
+      + COLUMN_SQ_CFG_DIR_DIRECTION + " BIGINT, "
+      + "CONSTRAINT " + CONSTRAINT_SQ_CFG_DIR_CONFIG + " "
+        + "FOREIGN KEY (" + COLUMN_SQ_CFG_DIR_CONFIG + ") "
+          + "REFERENCES " + TABLE_SQ_CONFIG + " (" + COLUMN_SQ_CFG_ID + "), "
+      + "CONSTRAINT " + CONSTRAINT_SQ_CFG_DIR_DIRECTION + " "
+        + "FOREIGN KEY (" + COLUMN_SQ_CFG_DIR_DIRECTION + ") "
+          + "REFERENCES " + TABLE_SQ_DIRECTION + " (" + COLUMN_SQD_ID + ")"
+      + ")";
+
   // DDL: Create table SQ_INPUT
   public static final String QUERY_CREATE_TABLE_SQ_INPUT =
       "CREATE TABLE " + TABLE_SQ_INPUT + " ("
@@ -435,6 +504,14 @@ public final class DerbySchemaQuery {
     + COLUMN_SQM_VALUE + ") "
     + "VALUES(?, ?)";
 
+  public static final String STMT_SELECT_SQD_ID_BY_SQD_NAME =
+      "SELECT " + COLUMN_SQD_ID + " FROM " + TABLE_SQ_DIRECTION
+          + " WHERE " + COLUMN_SQD_NAME + "=?";
+
+  public static final String STMT_SELECT_SQD_NAME_BY_SQD_ID =
+      "SELECT " + COLUMN_SQD_NAME + " FROM " + TABLE_SQ_DIRECTION
+          + " WHERE " + COLUMN_SQD_ID + "=?";
+
   // DML: Fetch connector Given Name
   public static final String STMT_FETCH_BASE_CONNECTOR =
       "SELECT "
@@ -459,7 +536,6 @@ public final class DerbySchemaQuery {
       "SELECT "
       + COLUMN_SQ_CFG_ID + ", "
       + COLUMN_SQ_CFG_OWNER + ", "
-      + COLUMN_SQ_CFG_DIRECTION + ", "
       + COLUMN_SQ_CFG_NAME + ", "
       + COLUMN_SQ_CFG_TYPE + ", "
       + COLUMN_SQ_CFG_INDEX
@@ -472,13 +548,12 @@ public final class DerbySchemaQuery {
       "SELECT "
       + COLUMN_SQ_CFG_ID + ", "
       + COLUMN_SQ_CFG_OWNER + ", "
-      + COLUMN_SQ_CFG_DIRECTION + ", "
       + COLUMN_SQ_CFG_NAME + ", "
       + COLUMN_SQ_CFG_TYPE + ", "
       + COLUMN_SQ_CFG_INDEX
       + " FROM " + TABLE_SQ_CONFIG
       + " WHERE " + COLUMN_SQ_CFG_OWNER + " IS NULL "
-      + " ORDER BY " + COLUMN_SQ_CFG_TYPE + ", " + COLUMN_SQ_CFG_DIRECTION  + ", " + COLUMN_SQ_CFG_INDEX;
+      + " ORDER BY " + COLUMN_SQ_CFG_TYPE + ", " + COLUMN_SQ_CFG_INDEX;
 
   // DML: Fetch inputs for a given config
   public static final String STMT_FETCH_INPUT =
@@ -544,15 +619,21 @@ public final class DerbySchemaQuery {
       + COLUMN_SQC_VERSION
       + ") VALUES (?, ?, ?)";
 
+  public static final String STMT_INSERT_CONNECTOR_WITHOUT_SUPPORTED_DIRECTIONS =
+      "INSERT INTO " + TABLE_SQ_CONNECTOR + " ("
+          + COLUMN_SQC_NAME + ", "
+          + COLUMN_SQC_CLASS + ", "
+          + COLUMN_SQC_VERSION
+          + ") VALUES (?, ?, ?)";
+
   // DML: Insert config base
   public static final String STMT_INSERT_CONFIG_BASE =
       "INSERT INTO " + TABLE_SQ_CONFIG + " ("
       + COLUMN_SQ_CFG_OWNER + ", "
-      + COLUMN_SQ_CFG_DIRECTION + ", "
       + COLUMN_SQ_CFG_NAME + ", "
       + COLUMN_SQ_CFG_TYPE + ", "
       + COLUMN_SQ_CFG_INDEX
-      + ") VALUES ( ?, ?, ?, ?, ?)";
+      + ") VALUES ( ?, ?, ?, ?)";
 
   // DML: Insert config input
   public static final String STMT_INSERT_INPUT_BASE =
@@ -1058,6 +1139,45 @@ public final class DerbySchemaQuery {
       "ALTER TABLE " + TABLE_SQ_LINK + " ADD CONSTRAINT "
           + CONSTRAINT_SQ_LNK_NAME_UNIQUE + " UNIQUE (" + COLUMN_SQ_LNK_NAME + ")";
 
+  public static final String STMT_INSERT_DIRECTION = "INSERT INTO " + TABLE_SQ_DIRECTION + " "
+      + "(" + COLUMN_SQD_NAME + ") VALUES (?)";
+
+  // DML: Fetch all configs
+  public static final String STMT_FETCH_CONFIG_DIRECTIONS =
+      "SELECT "
+          + COLUMN_SQ_CFG_ID + ", "
+          + COLUMN_SQ_CFG_DIRECTION
+          + " FROM " + TABLE_SQ_CONFIG;
+
+  public static final String QUERY_UPGRADE_TABLE_SQ_CONFIG_DROP_COLUMN_SQ_CFG_DIRECTION_VARCHAR =
+      "ALTER TABLE " + TABLE_SQ_CONFIG + " DROP COLUMN " + COLUMN_SQ_CFG_DIRECTION;
+
+  public static final String STMT_INSERT_SQ_CONNECTOR_DIRECTIONS =
+      "INSERT INTO " + TABLE_SQ_CONNECTOR_DIRECTIONS + " "
+          + "(" + COLUMN_SQCD_CONNECTOR + ", " + COLUMN_SQCD_DIRECTION + ")"
+          + " VALUES (?, ?)";
+
+  public static final String STMT_INSERT_SQ_CONFIG_DIRECTIONS =
+      "INSERT INTO " + TABLE_SQ_CONFIG_DIRECTIONS + " "
+          + "(" + COLUMN_SQ_CFG_DIR_CONFIG + ", " + COLUMN_SQ_CFG_DIR_DIRECTION + ")"
+          + " VALUES (?, ?)";
+
+  public static final String STMT_SELECT_SQ_CONNECTOR_DIRECTIONS_ALL =
+      "SELECT " + COLUMN_SQCD_CONNECTOR + ", " + COLUMN_SQCD_DIRECTION
+          + " FROM " + TABLE_SQ_CONNECTOR_DIRECTIONS;
+
+  public static final String STMT_SELECT_SQ_CONNECTOR_DIRECTIONS =
+      STMT_SELECT_SQ_CONNECTOR_DIRECTIONS_ALL + " WHERE "
+          + COLUMN_SQCD_CONNECTOR + " = ?";
+
+  public static final String STMT_SELECT_SQ_CONFIG_DIRECTIONS_ALL =
+      "SELECT " + COLUMN_SQ_CFG_DIR_CONFIG + ", " + COLUMN_SQ_CFG_DIR_DIRECTION
+          + " FROM " + TABLE_SQ_CONFIG_DIRECTIONS;
+
+  public static final String STMT_SELECT_SQ_CONFIG_DIRECTIONS =
+      STMT_SELECT_SQ_CONFIG_DIRECTIONS_ALL + " WHERE "
+          + COLUMN_SQ_CFG_DIR_CONFIG + " = ?";
+
   private DerbySchemaQuery() {
     // Disable explicit object creation
   }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/fc74316f/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/DerbyTestCase.java
----------------------------------------------------------------------
diff --git a/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/DerbyTestCase.java b/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/DerbyTestCase.java
index 2da084f..9316687 100644
--- a/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/DerbyTestCase.java
+++ b/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/DerbyTestCase.java
@@ -19,10 +19,13 @@ package org.apache.sqoop.repository.derby;
 
 import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_CREATE_SCHEMA_SQOOP;
 import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_CREATE_TABLE_SQ_CONFIG;
+import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_CREATE_TABLE_SQ_CONFIG_DIRECTIONS;
 import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_CREATE_TABLE_SQ_CONNECTOR;
+import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_CREATE_TABLE_SQ_CONNECTOR_DIRECTIONS;
 import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_CREATE_TABLE_SQ_COUNTER;
 import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_CREATE_TABLE_SQ_COUNTER_GROUP;
 import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_CREATE_TABLE_SQ_COUNTER_SUBMISSION;
+import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_CREATE_TABLE_SQ_DIRECTION;
 import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_CREATE_TABLE_SQ_INPUT;
 import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_CREATE_TABLE_SQ_JOB;
 import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_CREATE_TABLE_SQ_JOB_INPUT;
@@ -31,6 +34,7 @@ import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_CREATE_TA
 import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_CREATE_TABLE_SQ_SUBMISSION;
 import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_CREATE_TABLE_SQ_SYSTEM;
 import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_UPGRADE_TABLE_SQ_CONFIG_RENAME_COLUMN_SQ_CFG_OPERATION_TO_SQ_CFG_DIRECTION;
+import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_UPGRADE_TABLE_SQ_CONFIG_DROP_COLUMN_SQ_CFG_DIRECTION_VARCHAR;
 import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_UPGRADE_TABLE_SQ_JOB_ADD_COLUMN_CREATION_USER;
 import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_UPGRADE_TABLE_SQ_JOB_ADD_COLUMN_ENABLED;
 import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_UPGRADE_TABLE_SQ_JOB_ADD_COLUMN_SQB_TO_LINK;
@@ -47,6 +51,8 @@ import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_UPGRADE_T
 import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_UPGRADE_TABLE_SQ_LINK_ADD_UNIQUE_CONSTRAINT_NAME;
 import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_UPGRADE_TABLE_SQ_SUBMISSION_ADD_COLUMN_CREATION_USER;
 import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_UPGRADE_TABLE_SQ_SUBMISSION_ADD_COLUMN_UPDATE_USER;
+import static org.apache.sqoop.repository.derby.DerbySchemaQuery.STMT_INSERT_DIRECTION;
+
 import static org.junit.Assert.assertEquals;
 
 import java.sql.Connection;
@@ -147,6 +153,7 @@ abstract public class DerbyTestCase {
     }
 
     if (version > 3) {
+      runQuery(QUERY_CREATE_TABLE_SQ_DIRECTION);
       runQuery(QUERY_UPGRADE_TABLE_SQ_CONFIG_RENAME_COLUMN_SQ_CFG_OPERATION_TO_SQ_CFG_DIRECTION);
       runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_RENAME_COLUMN_SQB_LINK_TO_SQB_FROM_LINK);
       runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_ADD_COLUMN_SQB_TO_LINK);
@@ -156,6 +163,13 @@ abstract public class DerbyTestCase {
       runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_REMOVE_COLUMN_SQB_TYPE);
       runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_ADD_UNIQUE_CONSTRAINT_NAME);
       runQuery(QUERY_UPGRADE_TABLE_SQ_LINK_ADD_UNIQUE_CONSTRAINT_NAME);
+      runQuery(QUERY_UPGRADE_TABLE_SQ_CONFIG_DROP_COLUMN_SQ_CFG_DIRECTION_VARCHAR);
+      runQuery(QUERY_CREATE_TABLE_SQ_CONNECTOR_DIRECTIONS);
+      runQuery(QUERY_CREATE_TABLE_SQ_CONFIG_DIRECTIONS);
+
+      for (Direction direction : Direction.values()) {
+        runQuery(STMT_INSERT_DIRECTION, direction.toString());
+      }
     }
 
     runQuery("INSERT INTO SQOOP.SQ_SYSTEM(SQM_KEY, SQM_VALUE) VALUES('version', '"  + version + "')");
@@ -196,6 +210,42 @@ abstract public class DerbyTestCase {
     }
   }
 
+  /**
+   * Run single, arbitrary insert query on derby memory repository.
+   *
+   * @param query Query to execute
+   * @return Long id of newly inserted row (-1 if none).
+   * @throws Exception
+   */
+  protected Long runInsertQuery(String query, Object... args) throws Exception {
+    PreparedStatement stmt = null;
+    try {
+      stmt = getDerbyDatabaseConnection().prepareStatement(query, PreparedStatement.RETURN_GENERATED_KEYS);
+
+      for (int i = 0; i < args.length; ++i) {
+        if (args[i] instanceof String) {
+          stmt.setString(i + 1, (String)args[i]);
+        } else if (args[i] instanceof Long) {
+          stmt.setLong(i + 1, (Long)args[i]);
+        } else {
+          stmt.setString(i + 1, args[i].toString());
+        }
+      }
+
+      if (!stmt.execute()) {
+        ResultSet rs = stmt.getGeneratedKeys();
+        rs.next();
+        return rs.getLong(1);
+      }
+    } finally {
+      if (stmt != null) {
+        stmt.close();
+      }
+    }
+
+    return -1L;
+  }
+
   protected Connection getDerbyDatabaseConnection() {
     return connection;
   }
@@ -291,54 +341,59 @@ abstract public class DerbyTestCase {
   }
 
   protected void loadConnectorAndDriverConfigVersion4() throws Exception {
+    Long configId;
+
     // Connector entry
     runQuery("INSERT INTO SQOOP.SQ_CONNECTOR(SQC_NAME, SQC_CLASS, SQC_VERSION)"
         + "VALUES('A', 'org.apache.sqoop.test.A', '1.0-test')");
 
     for (String connector : new String[]{"1"}) {
+      // Directions
+      runQuery("INSERT INTO SQOOP.SQ_CONNECTOR_DIRECTIONS(SQCD_CONNECTOR, SQCD_DIRECTION)"
+          + "VALUES(" + connector + ", 1)");
+      runQuery("INSERT INTO SQOOP.SQ_CONNECTOR_DIRECTIONS(SQCD_CONNECTOR, SQCD_DIRECTION)"
+          + "VALUES(" + connector + ", 2)");
+
       // connector configs
-      for (String direction : new String[]{"null", "'FROM'", "'TO'"}) {
+      for (String direction : new String[]{null, "1", "2"}) {
 
         String type;
-        if (direction.equals("null")) {
+        if (direction == null) {
           type = "LINK";
         } else {
           type = "JOB";
         }
 
-        runQuery("INSERT INTO SQOOP.SQ_CONFIG"
-            + "(SQ_CFG_OWNER, SQ_CFG_DIRECTION, SQ_CFG_NAME, SQ_CFG_TYPE, SQ_CFG_INDEX) "
-            + "VALUES("
-            + connector + ", "
-            + direction
-            + ", 'C1', '"
-            + type
-            + "', 0)");
-        runQuery("INSERT INTO SQOOP.SQ_CONFIG"
-            + "(SQ_CFG_OWNER, SQ_CFG_DIRECTION, SQ_CFG_NAME, SQ_CFG_TYPE, SQ_CFG_INDEX) "
-            + "VALUES("
-            + connector + ", "
-            + direction
-            + ", 'C2', '"
-            + type
-            + "', 1)");
+        configId = runInsertQuery("INSERT INTO SQOOP.SQ_CONFIG"
+            + "(SQ_CFG_OWNER, SQ_CFG_NAME, SQ_CFG_TYPE, SQ_CFG_INDEX) "
+            + "VALUES(" + connector + ", 'C1', '" + type + "', 0)");
+
+        if (direction != null) {
+          runInsertQuery("INSERT INTO SQOOP.SQ_CONFIG_DIRECTIONS"
+              + "(SQ_CFG_DIR_CONFIG, SQ_CFG_DIR_DIRECTION) "
+              + "VALUES(" + configId + ", " + direction + ")");
+        }
+
+        configId = runInsertQuery("INSERT INTO SQOOP.SQ_CONFIG"
+            + "(SQ_CFG_OWNER, SQ_CFG_NAME, SQ_CFG_TYPE, SQ_CFG_INDEX) "
+            + "VALUES(" + connector + ", 'C2', '" + type + "', 1)");
+
+        if (direction != null) {
+          runInsertQuery("INSERT INTO SQOOP.SQ_CONFIG_DIRECTIONS"
+              + "(SQ_CFG_DIR_CONFIG, SQ_CFG_DIR_DIRECTION) "
+              + "VALUES(" + configId + ", " + direction + ")");
+        }
       }
     }
 
     // driver config
     for (String type : new String[]{"JOB"}) {
       runQuery("INSERT INTO SQOOP.SQ_CONFIG"
-          + "(SQ_CFG_OWNER, SQ_CFG_DIRECTION, SQ_CFG_NAME, SQ_CFG_TYPE, SQ_CFG_INDEX) "
-          + "VALUES(NULL, NULL"
-          + ", 'C1', '"
-          + type
-          + "', 0)");
+          + "(SQ_CFG_OWNER, SQ_CFG_NAME, SQ_CFG_TYPE, SQ_CFG_INDEX) "
+          + "VALUES(NULL" + ", 'C1', '" + type + "', 0)");
       runQuery("INSERT INTO SQOOP.SQ_CONFIG"
-          + "(SQ_CFG_OWNER, SQ_CFG_DIRECTION, SQ_CFG_NAME, SQ_CFG_TYPE, SQ_CFG_INDEX) "
-          + "VALUES(NULL, NULL"
-          + ", 'C2', '"
-          + type
-          + "', 1)");
+          + "(SQ_CFG_OWNER, SQ_CFG_NAME, SQ_CFG_TYPE, SQ_CFG_INDEX) "
+          + "VALUES(NULL" + ", 'C2', '" + type + "', 1)");
     }
 
     // Input entries
@@ -512,8 +567,10 @@ abstract public class DerbyTestCase {
    */
   public void addConnector() throws Exception {
     // Connector entry
-    runQuery("INSERT INTO SQOOP.SQ_CONNECTOR(SQC_NAME, SQC_CLASS, SQC_VERSION)"
-            + "VALUES('B', 'org.apache.sqoop.test.B', '1.0-test')");
+    Long connectorId = runInsertQuery("INSERT INTO SQOOP.SQ_CONNECTOR(SQC_NAME, SQC_CLASS, SQC_VERSION)"
+        + "VALUES('B', 'org.apache.sqoop.test.B', '1.0-test')");
+    runQuery("INSERT INTO SQOOP.SQ_CONNECTOR_DIRECTIONS (SQCD_CONNECTOR, SQCD_DIRECTION) VALUES (" + connectorId + ", 1)");
+    runQuery("INSERT INTO SQOOP.SQ_CONNECTOR_DIRECTIONS (SQCD_CONNECTOR, SQCD_DIRECTION) VALUES (" + connectorId + ", 2)");
   }
 
   /**
@@ -560,8 +617,20 @@ abstract public class DerbyTestCase {
   }
 
   protected MConnector getConnector() {
+    return getConnector(true, true);
+  }
+
+  protected MConnector getConnector(boolean from, boolean to) {
+    MFromConfig fromJobForms = null;
+    MToConfig toJobForms = null;
+    if (from) {
+      fromJobForms = getFromConfig();
+    }
+    if (to) {
+      toJobForms = getToConfig();
+    }
     return new MConnector("A", "org.apache.sqoop.test.A", "1.0-test",
-      getLinkConfig(), getFromConfig(), getToConfig());
+      getLinkConfig(), fromJobForms, toJobForms);
   }
   
   protected MDriver getDriver() {

http://git-wip-us.apache.org/repos/asf/sqoop/blob/fc74316f/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestConnectorHandling.java
----------------------------------------------------------------------
diff --git a/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestConnectorHandling.java b/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestConnectorHandling.java
index a0e8b91..fc95222 100644
--- a/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestConnectorHandling.java
+++ b/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestConnectorHandling.java
@@ -78,8 +78,6 @@ public class TestConnectorHandling extends DerbyTestCase {
     assertEquals(connectors.size(),2);
     assertEquals(connectors.get(0).getUniqueName(),"A");
     assertEquals(connectors.get(1).getUniqueName(),"B");
-
-
   }
 
   @Test
@@ -101,4 +99,64 @@ public class TestConnectorHandling extends DerbyTestCase {
     assertNotNull(retrieved);
     assertEquals(connector, retrieved);
   }
+
+  @Test
+  public void testFromDirection() throws Exception {
+    MConnector connector = getConnector(true, false);
+
+    handler.registerConnector(connector, getDerbyDatabaseConnection());
+
+    // Connector should get persistence ID
+    assertEquals(1, connector.getPersistenceId());
+
+    // Now check content in corresponding tables
+    assertCountForTable("SQOOP.SQ_CONNECTOR", 1);
+    assertCountForTable("SQOOP.SQ_CONFIG", 4);
+    assertCountForTable("SQOOP.SQ_INPUT", 8);
+
+    // Registered connector should be easily recovered back
+    MConnector retrieved = handler.findConnector("A", getDerbyDatabaseConnection());
+    assertNotNull(retrieved);
+    assertEquals(connector, retrieved);
+  }
+
+  @Test
+  public void testToDirection() throws Exception {
+    MConnector connector = getConnector(false, true);
+
+    handler.registerConnector(connector, getDerbyDatabaseConnection());
+
+    // Connector should get persistence ID
+    assertEquals(1, connector.getPersistenceId());
+
+    // Now check content in corresponding tables
+    assertCountForTable("SQOOP.SQ_CONNECTOR", 1);
+    assertCountForTable("SQOOP.SQ_CONFIG", 4);
+    assertCountForTable("SQOOP.SQ_INPUT", 8);
+
+    // Registered connector should be easily recovered back
+    MConnector retrieved = handler.findConnector("A", getDerbyDatabaseConnection());
+    assertNotNull(retrieved);
+    assertEquals(connector, retrieved);
+  }
+
+  @Test
+  public void testNeitherDirection() throws Exception {
+    MConnector connector = getConnector(false, false);
+
+    handler.registerConnector(connector, getDerbyDatabaseConnection());
+
+    // Connector should get persistence ID
+    assertEquals(1, connector.getPersistenceId());
+
+    // Now check content in corresponding tables
+    assertCountForTable("SQOOP.SQ_CONNECTOR", 1);
+    assertCountForTable("SQOOP.SQ_CONFIG", 2);
+    assertCountForTable("SQOOP.SQ_INPUT", 4);
+
+    // Registered connector should be easily recovered back
+    MConnector retrieved = handler.findConnector("A", getDerbyDatabaseConnection());
+    assertNotNull(retrieved);
+    assertEquals(connector, retrieved);
+  }
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/fc74316f/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestJobHandling.java
----------------------------------------------------------------------
diff --git a/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestJobHandling.java b/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestJobHandling.java
index 0752923..01a05b2 100644
--- a/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestJobHandling.java
+++ b/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestJobHandling.java
@@ -31,10 +31,6 @@ import org.apache.sqoop.model.MStringInput;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
 import static org.junit.Assert.*;
 
 /**

http://git-wip-us.apache.org/repos/asf/sqoop/blob/fc74316f/shell/src/main/java/org/apache/sqoop/shell/ShowConnectorFunction.java
----------------------------------------------------------------------
diff --git a/shell/src/main/java/org/apache/sqoop/shell/ShowConnectorFunction.java b/shell/src/main/java/org/apache/sqoop/shell/ShowConnectorFunction.java
index 09fb195..d605457 100644
--- a/shell/src/main/java/org/apache/sqoop/shell/ShowConnectorFunction.java
+++ b/shell/src/main/java/org/apache/sqoop/shell/ShowConnectorFunction.java
@@ -35,7 +35,6 @@ import static org.apache.sqoop.shell.utils.ConfigDisplayer.*;
 
 @SuppressWarnings("serial")
 public class ShowConnectorFunction extends SqoopFunction {
-  private static final char SUPPORTED_DIRECTIONS_SEPARATOR = '/';
 
   @SuppressWarnings("static-access")
   public ShowConnectorFunction() {
@@ -83,7 +82,7 @@ public class ShowConnectorFunction extends SqoopFunction {
       uniqueNames.add(connector.getUniqueName());
       versions.add(connector.getVersion());
       classes.add(connector.getClassName());
-      supportedDirections.add(getSupportedDirections(connector));
+      supportedDirections.add(connector.getSupportedDirections().toString());
     }
 
     TableDisplayer.display(header, ids, uniqueNames, versions, classes, supportedDirections);
@@ -113,33 +112,8 @@ public class ShowConnectorFunction extends SqoopFunction {
       connector.getUniqueName(),
       connector.getClassName(),
       connector.getVersion(),
-      getSupportedDirections(connector)
+      connector.getSupportedDirections().toString()
     );
     displayConnectorConfigDetails(connector, client.getConnectorConfigBundle(connector.getPersistenceId()));
   }
-
-  /**
-   * Creates a nicely formatted string for which directions are supported.
-   * Example: FROM/TO.
-   * @param connector
-   * @return String
-   */
-  private String getSupportedDirections(MConnector connector) {
-    StringBuffer supportedDirectionsBuffer = new StringBuffer();
-    SupportedDirections supportedDirections
-        = connector.getSupportedDirections();
-
-    if (supportedDirections.isDirectionSupported(Direction.FROM)) {
-      supportedDirectionsBuffer.append(Direction.FROM);
-
-      if (supportedDirections.isDirectionSupported(Direction.TO)) {
-        supportedDirectionsBuffer.append(SUPPORTED_DIRECTIONS_SEPARATOR);
-        supportedDirectionsBuffer.append(Direction.TO);
-      }
-    } else if (supportedDirections.isDirectionSupported(Direction.TO)) {
-      supportedDirectionsBuffer.append(Direction.TO);
-    }
-
-    return supportedDirectionsBuffer.toString();
-  }
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/fc74316f/spi/src/main/java/org/apache/sqoop/connector/spi/SqoopConnector.java
----------------------------------------------------------------------
diff --git a/spi/src/main/java/org/apache/sqoop/connector/spi/SqoopConnector.java b/spi/src/main/java/org/apache/sqoop/connector/spi/SqoopConnector.java
index 4272470..5315e1f 100644
--- a/spi/src/main/java/org/apache/sqoop/connector/spi/SqoopConnector.java
+++ b/spi/src/main/java/org/apache/sqoop/connector/spi/SqoopConnector.java
@@ -73,7 +73,7 @@ public abstract class SqoopConnector {
   public abstract From getFrom();
 
   /**
-   * @return an <tt>To</tt> that provides classes for performing export.
+   * @return an <tt>To</tt> that provides classes for performing export.n
    */
   public abstract To getTo();