You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by bl...@apache.org on 2012/11/26 18:13:26 UTC

[5/5] git commit: SQOOP-667 Persist all properties of MSubmission metadata (Jarek Jarcec Cecho)

SQOOP-667 Persist all properties of MSubmission metadata
(Jarek Jarcec Cecho)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/80a11f97
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/80a11f97
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/80a11f97

Branch: refs/heads/sqoop2
Commit: 80a11f972cf46abc3a2d8e0280d21ca47d2562aa
Parents: a348d5b
Author: Bilung Lee <bl...@apache.org>
Authored: Mon Nov 26 08:34:00 2012 -0800
Committer: Bilung Lee <bl...@apache.org>
Committed: Mon Nov 26 08:34:00 2012 -0800

----------------------------------------------------------------------
 .../apache/sqoop/submission/counter/Counters.java  |    4 +
 .../repository/derby/DerbyRepositoryHandler.java   |  222 ++++++++++-
 .../repository/derby/DerbySchemaConstants.java     |   52 +++-
 .../sqoop/repository/derby/DerbySchemaQuery.java   |  293 +++++++++++----
 .../sqoop/repository/derby/DerbyTestCase.java      |   42 ++-
 .../repository/derby/TestSubmissionHandling.java   |   87 ++++-
 6 files changed, 605 insertions(+), 95 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/80a11f97/common/src/main/java/org/apache/sqoop/submission/counter/Counters.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/submission/counter/Counters.java b/common/src/main/java/org/apache/sqoop/submission/counter/Counters.java
index 12c9464..9298419 100644
--- a/common/src/main/java/org/apache/sqoop/submission/counter/Counters.java
+++ b/common/src/main/java/org/apache/sqoop/submission/counter/Counters.java
@@ -44,4 +44,8 @@ public class Counters implements Iterable<CounterGroup> {
   public Iterator<CounterGroup> iterator() {
     return groups.values().iterator();
   }
+
+  public boolean isEmpty() {
+    return groups.isEmpty();
+  }
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/80a11f97/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java
----------------------------------------------------------------------
diff --git a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java
index 90aef4b..b9ba746 100644
--- a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java
+++ b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java
@@ -57,6 +57,9 @@ import org.apache.sqoop.repository.JdbcRepositoryContext;
 import org.apache.sqoop.repository.JdbcRepositoryHandler;
 import org.apache.sqoop.repository.JdbcRepositoryTransactionFactory;
 import org.apache.sqoop.submission.SubmissionStatus;
+import org.apache.sqoop.submission.counter.Counter;
+import org.apache.sqoop.submission.counter.CounterGroup;
+import org.apache.sqoop.submission.counter.Counters;
 import org.apache.sqoop.utils.StringUtils;
 
 /**
@@ -200,6 +203,9 @@ public class DerbyRepositoryHandler implements JdbcRepositoryHandler {
     runQuery(QUERY_CREATE_TABLE_SQ_CONNECTION_INPUT);
     runQuery(QUERY_CREATE_TABLE_SQ_JOB_INPUT);
     runQuery(QUERY_CREATE_TABLE_SQ_SUBMISSION);
+    runQuery(QUERY_CREATE_TABLE_SQ_COUNTER_GROUP);
+    runQuery(QUERY_CREATE_TABLE_SQ_COUNTER);
+    runQuery(QUERY_CREATE_TABLE_SQ_COUNTER_SUBMISSION);
   }
 
   /**
@@ -796,7 +802,11 @@ public class DerbyRepositoryHandler implements JdbcRepositoryHandler {
       stmt.setLong(1, submission.getJobId());
       stmt.setString(2, submission.getStatus().name());
       stmt.setTimestamp(3, new Timestamp(submission.getCreationDate().getTime()));
-      stmt.setString(4, submission.getExternalId());
+      stmt.setTimestamp(4, new Timestamp(submission.getLastUpdateDate().getTime()));
+      stmt.setString(5, submission.getExternalId());
+      stmt.setString(6, submission.getExternalLink());
+      stmt.setString(7, submission.getExceptionInfo());
+      stmt.setString(8, submission.getExceptionStackTrace());
 
       result = stmt.executeUpdate();
       if (result != 1) {
@@ -811,6 +821,12 @@ public class DerbyRepositoryHandler implements JdbcRepositoryHandler {
       }
 
       long submissionId = rsetSubmissionId.getLong(1);
+
+      if(submission.getCounters() != null) {
+        createSubmissionCounters(submissionId, submission.getCounters(), conn);
+      }
+
+      // Save created persistence id
       submission.setPersistenceId(submissionId);
 
     } catch (SQLException ex) {
@@ -850,20 +866,32 @@ public class DerbyRepositoryHandler implements JdbcRepositoryHandler {
   @Override
   public void updateSubmission(MSubmission submission, Connection conn) {
     PreparedStatement stmt = null;
+    PreparedStatement deleteStmt = null;
     try {
+      //  Update properties in main table
       stmt = conn.prepareStatement(STMT_UPDATE_SUBMISSION);
-      stmt.setLong(1, submission.getJobId());
-      stmt.setString(2, submission.getStatus().name());
-      stmt.setTimestamp(3, new Timestamp(submission.getCreationDate().getTime()));
-      stmt.setString(4, submission.getExternalId());
+      stmt.setString(1, submission.getStatus().name());
+      stmt.setTimestamp(2, new Timestamp(submission.getLastUpdateDate().getTime()));
+      stmt.setString(3, submission.getExceptionInfo());
+      stmt.setString(4, submission.getExceptionStackTrace());
 
       stmt.setLong(5, submission.getPersistenceId());
       stmt.executeUpdate();
 
+      // Delete previous counters
+      deleteStmt = conn.prepareStatement(STMT_DELETE_COUNTER_SUBMISSION);
+      deleteStmt.setLong(1, submission.getPersistenceId());
+      deleteStmt.executeUpdate();
+
+      // Reinsert new counters if needed
+      if(submission.getCounters() != null) {
+        createSubmissionCounters(submission.getPersistenceId(), submission.getCounters(), conn);
+      }
+
     } catch (SQLException ex) {
       throw new SqoopException(DerbyRepoError.DERBYREPO_0035, ex);
     } finally {
-      closeStatements(stmt);
+      closeStatements(stmt, deleteStmt);
     }
   }
 
@@ -901,7 +929,7 @@ public class DerbyRepositoryHandler implements JdbcRepositoryHandler {
         rs = stmt.executeQuery();
 
         while(rs.next()) {
-          submissions.add(loadSubmission(rs));
+          submissions.add(loadSubmission(rs, conn));
         }
 
         rs.close();
@@ -934,7 +962,7 @@ public class DerbyRepositoryHandler implements JdbcRepositoryHandler {
         return null;
       }
 
-      return loadSubmission(rs);
+      return loadSubmission(rs, conn);
     } catch (SQLException ex) {
       throw new SqoopException(DerbyRepoError.DERBYREPO_0037, ex);
     } finally {
@@ -944,20 +972,182 @@ public class DerbyRepositoryHandler implements JdbcRepositoryHandler {
   }
 
   /**
-   * {@inheritDoc}
+   * Stores counters for given submission in repository.
+   *
+   * @param submissionId Submission id
+   * @param counters Counters that should be stored
+   * @param conn Connection to derby repository
+   * @throws SQLException
    */
-  private MSubmission loadSubmission(ResultSet rs) throws SQLException {
-     MSubmission submission = new MSubmission(
-      rs.getLong(2),
-      rs.getTimestamp(3),
-      SubmissionStatus.valueOf(rs.getString(4)),
-      rs.getString(5)
-    );
+  private void createSubmissionCounters(long submissionId, Counters counters, Connection conn) throws SQLException {
+    PreparedStatement stmt = null;
+
+    try {
+      stmt = conn.prepareStatement(STMT_INSERT_COUNTER_SUBMISSION);
+
+      for(CounterGroup group : counters) {
+        long groupId = getCounterGroupId(group, conn);
+
+        for(Counter counter: group) {
+          long counterId = getCounterId(counter, conn);
+
+          stmt.setLong(1, groupId);
+          stmt.setLong(2, counterId);
+          stmt.setLong(3, submissionId);
+          stmt.setLong(4, counter.getValue());
+
+          stmt.executeUpdate();
+        }
+      }
+    } finally {
+      closeStatements(stmt);
+    }
+  }
+
+  /**
+   * Resolves counter group database id.
+   *
+   * @param group Given group
+   * @param conn Connection to metastore
+   * @return Id
+   * @throws SQLException
+   */
+  private long getCounterGroupId(CounterGroup group, Connection conn) throws SQLException {
+    PreparedStatement select = null;
+    PreparedStatement insert = null;
+    ResultSet rsSelect = null;
+    ResultSet rsInsert = null;
+
+    try {
+      select = conn.prepareStatement(STMT_SELECT_COUNTER_GROUP);
+      select.setString(1, group.getName());
+
+      rsSelect = select.executeQuery();
+
+      if(rsSelect.next()) {
+        return rsSelect.getLong(1);
+      }
+
+      insert = conn.prepareStatement(STMT_INSERT_COUNTER_GROUP, Statement.RETURN_GENERATED_KEYS);
+      insert.setString(1, group.getName());
+      insert.executeUpdate();
+
+      rsInsert = insert.getGeneratedKeys();
+
+      if (!rsInsert.next()) {
+        throw new SqoopException(DerbyRepoError.DERBYREPO_0013);
+      }
+
+      return rsInsert.getLong(1);
+    } finally {
+      closeResultSets(rsSelect, rsInsert);
+      closeStatements(select, insert);
+    }
+  }
+
+  /**
+   * Resolves counter id.
+   *
+   * @param counter Given counter
+   * @param conn connection to metastore
+   * @return Id
+   * @throws SQLException
+   */
+  private long getCounterId(Counter counter, Connection conn) throws SQLException {
+    PreparedStatement select = null;
+    PreparedStatement insert = null;
+    ResultSet rsSelect = null;
+    ResultSet rsInsert = null;
+
+    try {
+      select = conn.prepareStatement(STMT_SELECT_COUNTER);
+      select.setString(1, counter.getName());
+
+      rsSelect = select.executeQuery();
+
+      if(rsSelect.next()) {
+        return rsSelect.getLong(1);
+      }
+
+      insert = conn.prepareStatement(STMT_INSERT_COUNTER, Statement.RETURN_GENERATED_KEYS);
+      insert.setString(1, counter.getName());
+      insert.executeUpdate();
+
+      rsInsert = insert.getGeneratedKeys();
+
+      if (!rsInsert.next()) {
+        throw new SqoopException(DerbyRepoError.DERBYREPO_0013);
+      }
+
+      return rsInsert.getLong(1);
+    } finally {
+      closeResultSets(rsSelect, rsInsert);
+      closeStatements(select, insert);
+    }
+  }
+
+  /**
+   * Create MSubmission structure from result set.
+   *
+   * @param rs Result set, only active row will be fetched
+   * @param conn Connection to metastore
+   * @return Created MSubmission structure
+   * @throws SQLException
+   */
+  private MSubmission loadSubmission(ResultSet rs, Connection conn) throws SQLException {
+    MSubmission submission = new MSubmission();
+
     submission.setPersistenceId(rs.getLong(1));
+    submission.setJobId(rs.getLong(2));
+    submission.setStatus(SubmissionStatus.valueOf(rs.getString(3)));
+    submission.setCreationDate(rs.getTimestamp(4));
+    submission.setLastUpdateDate(rs.getTimestamp(5));
+    submission.setExternalId(rs.getString(6));
+    submission.setExternalLink(rs.getString(7));
+    submission.setExceptionInfo(rs.getString(8));
+    submission.setExceptionStackTrace(rs.getString(9));
+
+    Counters counters = loadCountersSubmission(rs.getLong(1), conn);
+    submission.setCounters(counters);
 
     return submission;
   }
 
+  private Counters loadCountersSubmission(long submissionId, Connection conn) throws SQLException {
+    PreparedStatement stmt = null;
+    ResultSet rs = null;
+    try {
+      stmt = conn.prepareStatement(STMT_SELECT_COUNTER_SUBMISSION);
+      stmt.setLong(1, submissionId);
+      rs = stmt.executeQuery();
+
+      Counters counters = new Counters();
+
+      while(rs.next()) {
+        String groupName = rs.getString(1);
+        String counterName = rs.getString(2);
+        long value = rs.getLong(3);
+
+        CounterGroup group = counters.getCounterGroup(groupName);
+        if(group == null) {
+          group = new CounterGroup(groupName);
+          counters.addCounterGroup(group);
+        }
+
+        group.addCounter(new Counter(counterName, value));
+      }
+
+      if(counters.isEmpty()) {
+        return null;
+      } else {
+        return counters;
+      }
+    } finally {
+      closeStatements(stmt);
+      closeResultSets(rs);
+    }
+  }
+
   private List<MConnection> loadConnections(PreparedStatement stmt,
                                             Connection conn)
                                             throws SQLException {

http://git-wip-us.apache.org/repos/asf/sqoop/blob/80a11f97/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaConstants.java
----------------------------------------------------------------------
diff --git a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaConstants.java b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaConstants.java
index 3664df6..1d1fc09 100644
--- a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaConstants.java
+++ b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaConstants.java
@@ -146,12 +146,60 @@ public final class DerbySchemaConstants {
 
   public static final String COLUMN_SQS_JOB = "SQS_JOB";
 
-  public static final String COLUMN_SQS_DATE = "SQS_DATE";
-
   public static final String COLUMN_SQS_STATUS = "SQS_STATUS";
 
+  public static final String COLUMN_SQS_CREATION_DATE = "SQS_CREATION_DATE";
+
+  public static final String COLUMN_SQS_UPDATE_DATE = "SQS_UPDATE_DATE";
+
   public static final String COLUMN_SQS_EXTERNAL_ID = "SQS_EXTERNAL_ID";
 
+  public static final String COLUMN_SQS_EXTERNAL_LINK = "SQS_EXTERNAL_LINK";
+
+  public static final String COLUMN_SQS_EXCEPTION = "SQS_EXCEPTION";
+
+  public static final String COLUMN_SQS_EXCEPTION_TRACE = "SQS_EXCEPTION_TRACE";
+
+  // SQ_COUNTER_GROUP
+
+  public static final String TABLE_SQ_COUNTER_GROUP_NAME =
+      "SQ_COUNTER_GROUP";
+
+  public static final String TABLE_SQ_COUNTER_GROUP = SCHEMA_PREFIX
+      + TABLE_SQ_COUNTER_GROUP_NAME;
+
+  public static final String COLUMN_SQG_ID = "SQG_ID";
+
+  public static final String COLUMN_SQG_NAME = "SQG_NAME";
+
+  // SQ_COUNTER_GROUP
+
+  public static final String TABLE_SQ_COUNTER_NAME =
+      "SQ_COUNTER";
+
+  public static final String TABLE_SQ_COUNTER = SCHEMA_PREFIX
+      + TABLE_SQ_COUNTER_NAME;
+
+  public static final String COLUMN_SQR_ID = "SQR_ID";
+
+  public static final String COLUMN_SQR_NAME = "SQR_NAME";
+
+  // SQ_COUNTER_SUBMISSION
+
+  public static final String TABLE_SQ_COUNTER_SUBMISSION_NAME =
+      "SQ_COUNTER_SUBMISSION";
+
+  public static final String TABLE_SQ_COUNTER_SUBMISSION = SCHEMA_PREFIX
+      + TABLE_SQ_COUNTER_SUBMISSION_NAME;
+
+  public static final String COLUMN_SQRS_GROUP = "SQRS_GROUP";
+
+  public static final String COLUMN_SQRS_COUNTER = "SQRS_COUNTER";
+
+  public static final String COLUMN_SQRS_SUBMISSION = "SQRS_SUBMISSION";
+
+  public static final String COLUMN_SQRS_VALUE = "SQRS_VALUE";
+
   private DerbySchemaConstants() {
     // Disable explicit object creation
   }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/80a11f97/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaQuery.java
----------------------------------------------------------------------
diff --git a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaQuery.java b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaQuery.java
index 496e3d9..55d7128 100644
--- a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaQuery.java
+++ b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaQuery.java
@@ -119,14 +119,53 @@ import static org.apache.sqoop.repository.derby.DerbySchemaConstants.*;
  * <p>
  * <strong>SQ_SUBMISSION</strong>: List of submissions
  * <pre>
+ *    +-----------------------------------+
+ *    | SQ_JOB_SUBMISSION                 |
+ *    +-----------------------------------+
+ *    | SQS_ID: BIGINT PK                 |
+ *    | SQS_JOB: BIGINT                   | FK SQ_JOB(SQB_ID)
+ *    | SQS_STATUS: VARCHAR(20)           |
+ *    | SQS_CREATION_DATE: TIMESTAMP      |
+ *    | SQS_UPDATE_DATE: TIMESTAMP        |
+ *    | SQS_EXTERNAL_ID: VARCHAR(25)      |
+ *    | SQS_EXTERNAL_LINK: VARCHAR(75)    |
+ *    | SQS_EXCEPTION: VARCHAR(75)        |
+ *    | SQS_EXCEPTION_TRACE: VARCHAR(500) |
+ *    +-----------------------------------+
+ * </pre>
+ * </p>
+ * <p>
+ * <strong>SQ_COUNTER_GROUP</strong>: List of counter groups
+ * <pre>
+ *    +----------------------------+
+ *    | SQ_COUNTER_GROUP           |
+ *    +----------------------------+
+ *    | SQG_ID: BIGINT PK          |
+ *    | SQG_NAME: VARCHAR(50)      |
  *    +----------------------------+
- *    | SQ_JOB_SUBMISSION          |
+ * </pre>
+ * </p>
+ * <p>
+ * <strong>SQ_COUNTER</strong>: List of counters
+ * <pre>
+ *    +----------------------------+
+ *    | SQ_COUNTER                 |
  *    +----------------------------+
- *    | SQS_ID: BIGINT PK          |
- *    | SQS_JOB: BIGINT            | FK SQ_JOB(SQB_ID)
- *    | SQS_STATUS: VARCHAR(20)    |
- *    | SQS_DATE: TIMESTAMP        |
- *    | SQS_EXTERNAL_ID:VARCHAR(50)|
+ *    | SQR_ID: BIGINT PK          |
+ *    | SQR_NAME: VARCHAR(50)      |
+ *    +----------------------------+
+ * </pre>
+ * </p>
+ * <p>
+ * <strong>SQ_COUNTER_SUBMISSION</strong>: N:M Relationship
+ * <pre>
+ *    +----------------------------+
+ *    | SQ_COUNTER_SUBMISSION      |
+ *    +----------------------------+
+ *    | SQRS_GROUP: BIGINT PK      | FK SQ_COUNTER_GROUP(SQR_ID)
+ *    | SQRS_COUNTER: BIGINT PK    | FK SQ_COUNTER(SQR_ID)
+ *    | SQRS_SUBMISSION: BIGINT PK | FK SQ_SUBMISSION(SQS_ID)
+ *    | SQRS_VALUE: BIGINT         |
  *    +----------------------------+
  * </pre>
  * </p>
@@ -143,69 +182,77 @@ public final class DerbySchemaQuery {
 
   // DDL: Create table SQ_CONNECTOR
   public static final String QUERY_CREATE_TABLE_SQ_CONNECTOR =
-      "CREATE TABLE " + TABLE_SQ_CONNECTOR + " (" + COLUMN_SQC_ID
-      + " BIGINT GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1) "
-      + "PRIMARY KEY, " + COLUMN_SQC_NAME + " VARCHAR(64), " + COLUMN_SQC_CLASS
-      + " VARCHAR(255))";
+      "CREATE TABLE " + TABLE_SQ_CONNECTOR + " ("
+      + COLUMN_SQC_ID + " BIGINT GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1) PRIMARY KEY, "
+      + COLUMN_SQC_NAME + " VARCHAR(64), "
+      + COLUMN_SQC_CLASS + " VARCHAR(255)"
+      + ")";
 
   // DDL: Create table SQ_FORM
   public static final String QUERY_CREATE_TABLE_SQ_FORM =
-      "CREATE TABLE " + TABLE_SQ_FORM + " (" + COLUMN_SQF_ID
-      + " BIGINT GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1) "
-      + "PRIMARY KEY, " + COLUMN_SQF_CONNECTOR + " BIGINT, "
+      "CREATE TABLE " + TABLE_SQ_FORM + " ("
+      + COLUMN_SQF_ID + " BIGINT GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1) PRIMARY KEY, "
+      + COLUMN_SQF_CONNECTOR + " BIGINT, "
       + COLUMN_SQF_OPERATION + " VARCHAR(32), "
-      + COLUMN_SQF_NAME + " VARCHAR(64), " + COLUMN_SQF_TYPE + " VARCHAR(32), "
-      + COLUMN_SQF_INDEX + " SMALLINT, " + " FOREIGN KEY ("
-      + COLUMN_SQF_CONNECTOR+ ") REFERENCES " + TABLE_SQ_CONNECTOR + " ("
-      + COLUMN_SQC_ID + "))";
+      + COLUMN_SQF_NAME + " VARCHAR(64), "
+      + COLUMN_SQF_TYPE + " VARCHAR(32), "
+      + COLUMN_SQF_INDEX + " SMALLINT, "
+      + " FOREIGN KEY (" + COLUMN_SQF_CONNECTOR+ ") REFERENCES " + TABLE_SQ_CONNECTOR + " (" + COLUMN_SQC_ID + ")"
+      + ")";
 
   // DDL: Create table SQ_INPUT
   public static final String QUERY_CREATE_TABLE_SQ_INPUT =
-      "CREATE TABLE " + TABLE_SQ_INPUT + " (" + COLUMN_SQI_ID
-      + " BIGINT GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1) "
-      + "PRIMARY KEY, " + COLUMN_SQI_NAME + " VARCHAR(64), "
-      + COLUMN_SQI_FORM + " BIGINT, " + COLUMN_SQI_INDEX + " SMALLINT, "
-      + COLUMN_SQI_TYPE + " VARCHAR(32), " + COLUMN_SQI_STRMASK + " BOOLEAN, "
-      + COLUMN_SQI_STRLENGTH + " SMALLINT, " + COLUMN_SQI_ENUMVALS
-      + " VARCHAR(100), FOREIGN KEY (" + COLUMN_SQI_FORM + ") REFERENCES "
-      + TABLE_SQ_FORM + " (" + COLUMN_SQF_ID + "))";
+      "CREATE TABLE " + TABLE_SQ_INPUT + " ("
+      + COLUMN_SQI_ID + " BIGINT GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1) PRIMARY KEY, "
+      + COLUMN_SQI_NAME + " VARCHAR(64), "
+      + COLUMN_SQI_FORM + " BIGINT, "
+      + COLUMN_SQI_INDEX + " SMALLINT, "
+      + COLUMN_SQI_TYPE + " VARCHAR(32), "
+      + COLUMN_SQI_STRMASK + " BOOLEAN, "
+      + COLUMN_SQI_STRLENGTH + " SMALLINT, "
+      + COLUMN_SQI_ENUMVALS + " VARCHAR(100),"
+      + " FOREIGN KEY (" + COLUMN_SQI_FORM + ") REFERENCES " + TABLE_SQ_FORM + " (" + COLUMN_SQF_ID + ")"
+      + ")";
 
   // DDL: Create table SQ_CONNECTION
   public static final String QUERY_CREATE_TABLE_SQ_CONNECTION =
-      "CREATE TABLE " + TABLE_SQ_CONNECTION + " (" + COLUMN_SQN_ID
-      + " BIGINT GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1) "
-      + "PRIMARY KEY, " + COLUMN_SQN_CONNECTOR + " BIGINT, " + COLUMN_SQN_NAME
-      + " VARCHAR(32), FOREIGN KEY(" + COLUMN_SQN_CONNECTOR + ") REFERENCES "
-      + TABLE_SQ_CONNECTOR + " (" + COLUMN_SQC_ID + "))";
+      "CREATE TABLE " + TABLE_SQ_CONNECTION + " ("
+      + COLUMN_SQN_ID + " BIGINT GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1) PRIMARY KEY, "
+      + COLUMN_SQN_CONNECTOR + " BIGINT, "
+      + COLUMN_SQN_NAME  + " VARCHAR(32),"
+      + " FOREIGN KEY(" + COLUMN_SQN_CONNECTOR + ") REFERENCES " + TABLE_SQ_CONNECTOR + " (" + COLUMN_SQC_ID + ")"
+      + ")";
 
   // DDL: Create table SQ_JOB
   public static final String QUERY_CREATE_TABLE_SQ_JOB =
-      "CREATE TABLE " + TABLE_SQ_JOB + " (" + COLUMN_SQB_ID
-      + " BIGINT GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1) "
-      + "PRIMARY KEY, " + COLUMN_SQB_CONNECTION + " BIGINT, " + COLUMN_SQB_NAME
-      + " VARCHAR(64), " + COLUMN_SQB_TYPE + " VARCHAR(64), FOREIGN KEY("
-      + COLUMN_SQB_CONNECTION + ") REFERENCES " + TABLE_SQ_CONNECTION + " ("
-      + COLUMN_SQN_ID + "))";
+      "CREATE TABLE " + TABLE_SQ_JOB + " ("
+      + COLUMN_SQB_ID + " BIGINT GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1) PRIMARY KEY, "
+      + COLUMN_SQB_CONNECTION + " BIGINT, "
+      + COLUMN_SQB_NAME + " VARCHAR(64), "
+      + COLUMN_SQB_TYPE + " VARCHAR(64),"
+      + " FOREIGN KEY(" + COLUMN_SQB_CONNECTION + ") REFERENCES " + TABLE_SQ_CONNECTION + " (" + COLUMN_SQN_ID + ")"
+      + ")";
 
   // DDL: Create table SQ_CONNECTION_INPUT
   public static final String QUERY_CREATE_TABLE_SQ_CONNECTION_INPUT =
       "CREATE TABLE " + TABLE_SQ_CONNECTION_INPUT + " ("
-      + COLUMN_SQNI_CONNECTION + " BIGINT, " + COLUMN_SQNI_INPUT + " BIGINT, "
-      + COLUMN_SQNI_VALUE + " LONG VARCHAR, PRIMARY KEY ("
-      + COLUMN_SQNI_CONNECTION + ", " + COLUMN_SQNI_INPUT + "), FOREIGN KEY ("
-      + COLUMN_SQNI_CONNECTION + ") REFERENCES " + TABLE_SQ_CONNECTION + " ("
-      + COLUMN_SQN_ID + "), FOREIGN KEY (" + COLUMN_SQNI_INPUT + ") REFERENCES "
-      + TABLE_SQ_INPUT + " (" + COLUMN_SQI_ID + "))";
+      + COLUMN_SQNI_CONNECTION + " BIGINT, "
+      + COLUMN_SQNI_INPUT + " BIGINT, "
+      + COLUMN_SQNI_VALUE + " LONG VARCHAR,"
+      + " PRIMARY KEY (" + COLUMN_SQNI_CONNECTION + ", " + COLUMN_SQNI_INPUT + "),"
+      + " FOREIGN KEY (" + COLUMN_SQNI_CONNECTION + ") REFERENCES " + TABLE_SQ_CONNECTION + " (" + COLUMN_SQN_ID + "),"
+      + " FOREIGN KEY (" + COLUMN_SQNI_INPUT + ") REFERENCES " + TABLE_SQ_INPUT + " (" + COLUMN_SQI_ID + ")"
+      + ")";
 
   // DDL: Create table SQ_JOB_INPUT
   public static final String QUERY_CREATE_TABLE_SQ_JOB_INPUT =
       "CREATE TABLE " + TABLE_SQ_JOB_INPUT + " ("
-      + COLUMN_SQBI_JOB + " BIGINT, " + COLUMN_SQBI_INPUT + " BIGINT, "
-      + COLUMN_SQBI_VALUE + " LONG VARCHAR, PRIMARY KEY ("
-      + COLUMN_SQBI_JOB + ", " + COLUMN_SQBI_INPUT + "), FOREIGN KEY ("
-      + COLUMN_SQBI_JOB + ") REFERENCES " + TABLE_SQ_JOB + " ("
-      + COLUMN_SQB_ID + "), FOREIGN KEY (" + COLUMN_SQBI_INPUT + ") REFERENCES "
-      + TABLE_SQ_INPUT + " (" + COLUMN_SQI_ID + "))";
+      + COLUMN_SQBI_JOB + " BIGINT, "
+      + COLUMN_SQBI_INPUT + " BIGINT, "
+      + COLUMN_SQBI_VALUE + " LONG VARCHAR,"
+      + " PRIMARY KEY (" + COLUMN_SQBI_JOB + ", " + COLUMN_SQBI_INPUT + "), "
+      + " FOREIGN KEY (" + COLUMN_SQBI_JOB + ") REFERENCES " + TABLE_SQ_JOB + " (" + COLUMN_SQB_ID + "), "
+      + " FOREIGN KEY (" + COLUMN_SQBI_INPUT + ") REFERENCES " + TABLE_SQ_INPUT + " (" + COLUMN_SQI_ID + "))";
 
   // DDL: Create table SQ_SUBMISSION
   public static final String QUERY_CREATE_TABLE_SQ_SUBMISSION =
@@ -213,11 +260,46 @@ public final class DerbySchemaQuery {
     + COLUMN_SQS_ID + " BIGINT GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1), "
     + COLUMN_SQS_JOB + " BIGINT, "
     + COLUMN_SQS_STATUS + " VARCHAR(20), "
-    + COLUMN_SQS_DATE + " TIMESTAMP,"
-    + COLUMN_SQS_EXTERNAL_ID + " VARCHAR(50), "
+    + COLUMN_SQS_CREATION_DATE + " TIMESTAMP,"
+    + COLUMN_SQS_UPDATE_DATE + " TIMESTAMP,"
+    + COLUMN_SQS_EXTERNAL_ID + " VARCHAR(25), "
+    + COLUMN_SQS_EXTERNAL_LINK + " VARCHAR(75), "
+    + COLUMN_SQS_EXCEPTION + " VARCHAR(75), "
+    + COLUMN_SQS_EXCEPTION_TRACE + " VARCHAR(500), "
     + "PRIMARY KEY (" + COLUMN_SQS_ID + "), "
-    + "FOREIGN KEY (" + COLUMN_SQS_JOB + ") REFERENCES " + TABLE_SQ_JOB + "("
-    +   COLUMN_SQB_ID + "))";
+    + "FOREIGN KEY (" + COLUMN_SQS_JOB + ") REFERENCES " + TABLE_SQ_JOB + "("  + COLUMN_SQB_ID + ") ON DELETE CASCADE"
+    +  ")";
+
+  // DDL: Create table SQ_COUNTER_GROUP
+  public static final String QUERY_CREATE_TABLE_SQ_COUNTER_GROUP =
+    "CREATE TABLE " + TABLE_SQ_COUNTER_GROUP + " ("
+    + COLUMN_SQG_ID + " BIGINT GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1), "
+    + COLUMN_SQG_NAME + " VARCHAR(50), "
+    + "PRIMARY KEY (" + COLUMN_SQG_ID + "),"
+    + "UNIQUE ( " + COLUMN_SQG_NAME + ")"
+    + ")";
+
+  // DDL: Create table SQ_COUNTER
+  public static final String QUERY_CREATE_TABLE_SQ_COUNTER =
+    "CREATE TABLE " + TABLE_SQ_COUNTER + " ("
+    + COLUMN_SQR_ID + " BIGINT GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1), "
+    + COLUMN_SQR_NAME + " VARCHAR(50), "
+    + "PRIMARY KEY (" + COLUMN_SQR_ID + "), "
+    + "UNIQUE ( " + COLUMN_SQR_NAME + ")"
+    + ")";
+
+  // DDL: Create table SQ_COUNTER_SUBMISSION
+  public static final String QUERY_CREATE_TABLE_SQ_COUNTER_SUBMISSION =
+    "CREATE TABLE " + TABLE_SQ_COUNTER_SUBMISSION + " ("
+    + COLUMN_SQRS_GROUP + " BIGINT, "
+    + COLUMN_SQRS_COUNTER + " BIGINT, "
+    + COLUMN_SQRS_SUBMISSION + " BIGINT, "
+    + COLUMN_SQRS_VALUE + " BIGINT, "
+    + "PRIMARY KEY (" + COLUMN_SQRS_GROUP + ", " + COLUMN_SQRS_COUNTER + ", " + COLUMN_SQRS_SUBMISSION + "), "
+    + "FOREIGN KEY (" + COLUMN_SQRS_GROUP + ") REFERENCES " + TABLE_SQ_COUNTER_GROUP + "(" + COLUMN_SQG_ID + "), "
+    + "FOREIGN KEY (" + COLUMN_SQRS_COUNTER + ") REFERENCES " + TABLE_SQ_COUNTER + "(" + COLUMN_SQR_ID + "), "
+    + "FOREIGN KEY (" + COLUMN_SQRS_SUBMISSION + ") REFERENCES " + TABLE_SQ_SUBMISSION + "(" + COLUMN_SQS_ID + ") ON DELETE CASCADE "
+    + ")";
 
   // DML: Fetch connector Given Name
   public static final String STMT_FETCH_BASE_CONNECTOR =
@@ -384,18 +466,22 @@ public final class DerbySchemaQuery {
     "INSERT INTO " + TABLE_SQ_SUBMISSION + "("
     + COLUMN_SQS_JOB + ", "
     + COLUMN_SQS_STATUS + ", "
-    + COLUMN_SQS_DATE + ", "
-    + COLUMN_SQS_EXTERNAL_ID + ") "
-    + " VALUES(?, ?, ?, ?)";
+    + COLUMN_SQS_CREATION_DATE + ", "
+    + COLUMN_SQS_UPDATE_DATE + ", "
+    + COLUMN_SQS_EXTERNAL_ID + ", "
+    + COLUMN_SQS_EXTERNAL_LINK + ", "
+    + COLUMN_SQS_EXCEPTION + ", "
+    + COLUMN_SQS_EXCEPTION_TRACE + ") "
+    + " VALUES(?, ?, ?, ?, ?, ?, ?, ?)";
 
   // DML: Update existing submission
   public static final String STMT_UPDATE_SUBMISSION =
     "UPDATE " + TABLE_SQ_SUBMISSION + " SET "
-    + COLUMN_SQS_JOB + " = ?, "
     + COLUMN_SQS_STATUS + " = ?, "
-    + COLUMN_SQS_DATE + " = ?, "
-    + COLUMN_SQS_EXTERNAL_ID + " = ? "
-    + "WHERE " + COLUMN_SQS_ID + " = ?";
+    + COLUMN_SQS_UPDATE_DATE + " = ?, "
+    + COLUMN_SQS_EXCEPTION + " = ?, "
+    + COLUMN_SQS_EXCEPTION_TRACE + " = ?"
+    + " WHERE " + COLUMN_SQS_ID + " = ?";
 
   // DML: Check if given submission exists
   public static final String STMT_SELECT_SUBMISSION_CHECK =
@@ -404,20 +490,93 @@ public final class DerbySchemaQuery {
 
   // DML: Purge old entries
   public static final String STMT_PURGE_SUBMISSIONS =
-    "DELETE FROM " + TABLE_SQ_SUBMISSION + " WHERE " + COLUMN_SQS_DATE + " < ?";
+    "DELETE FROM " + TABLE_SQ_SUBMISSION
+    + " WHERE " + COLUMN_SQS_UPDATE_DATE + " < ?";
 
   // DML: Get unfinished
   public static final String STMT_SELECT_SUBMISSION_UNFINISHED =
-    "SELECT " + COLUMN_SQS_ID + ", " + COLUMN_SQS_JOB + ", " + COLUMN_SQS_DATE
-    + ", " + COLUMN_SQS_STATUS + ", " + COLUMN_SQS_EXTERNAL_ID + " FROM "
-    + TABLE_SQ_SUBMISSION + " WHERE " + COLUMN_SQS_STATUS + " = ?";
+    "SELECT "
+    + COLUMN_SQS_ID + ", "
+    + COLUMN_SQS_JOB + ", "
+    + COLUMN_SQS_STATUS + ", "
+    + COLUMN_SQS_CREATION_DATE + ", "
+    + COLUMN_SQS_UPDATE_DATE + ", "
+    + COLUMN_SQS_EXTERNAL_ID + ", "
+    + COLUMN_SQS_EXTERNAL_LINK + ", "
+    + COLUMN_SQS_EXCEPTION + ", "
+    + COLUMN_SQS_EXCEPTION_TRACE
+    + " FROM " + TABLE_SQ_SUBMISSION
+    + " WHERE " + COLUMN_SQS_STATUS + " = ?";
 
   // DML: Last submission for a job
   public static final String STMT_SELECT_SUBMISSION_LAST_FOR_JOB =
-    "SELECT " + COLUMN_SQS_ID + ", " + COLUMN_SQS_JOB + ", " + COLUMN_SQS_DATE
-      + ", " + COLUMN_SQS_STATUS + ", " + COLUMN_SQS_EXTERNAL_ID + " FROM "
-      + TABLE_SQ_SUBMISSION + " WHERE " + COLUMN_SQS_JOB + " = ? ORDER BY "
-      + COLUMN_SQS_DATE + " DESC";
+    "SELECT "
+    + COLUMN_SQS_ID + ", "
+    + COLUMN_SQS_JOB + ", "
+    + COLUMN_SQS_STATUS + ", "
+    + COLUMN_SQS_CREATION_DATE + ", "
+    + COLUMN_SQS_UPDATE_DATE + ", "
+    + COLUMN_SQS_EXTERNAL_ID + ", "
+    + COLUMN_SQS_EXTERNAL_LINK + ", "
+    + COLUMN_SQS_EXCEPTION + ", "
+    + COLUMN_SQS_EXCEPTION_TRACE
+    + " FROM " + TABLE_SQ_SUBMISSION
+    + " WHERE " + COLUMN_SQS_JOB + " = ?"
+    + " ORDER BY " + COLUMN_SQS_UPDATE_DATE + " DESC";
+
+  // DML: Select counter group
+  public static final String STMT_SELECT_COUNTER_GROUP =
+    "SELECT "
+    + COLUMN_SQG_ID + ", "
+    + COLUMN_SQG_NAME + " "
+    + "FROM " + TABLE_SQ_COUNTER_GROUP + " "
+    + "WHERE " + COLUMN_SQG_NAME + " = ?";
+
+  // DML: Insert new counter group
+  public static final String STMT_INSERT_COUNTER_GROUP =
+    "INSERT INTO " + TABLE_SQ_COUNTER_GROUP + " ("
+    + COLUMN_SQG_NAME + ") "
+    + "VALUES (?)";
+
+  // DML: Select counter
+  public static final String STMT_SELECT_COUNTER =
+    "SELECT "
+    + COLUMN_SQR_ID + ", "
+    + COLUMN_SQR_NAME + " "
+    + "FROM " + TABLE_SQ_COUNTER + " "
+    + "WHERE " + COLUMN_SQR_NAME + " = ?";
+
+  // DML: Insert new counter
+  public static final String STMT_INSERT_COUNTER =
+    "INSERT INTO " + TABLE_SQ_COUNTER + " ("
+    + COLUMN_SQR_NAME + ") "
+    + "VALUES (?)";
+
+  // DML: Insert new counter submission
+  public static final String STMT_INSERT_COUNTER_SUBMISSION =
+    "INSERT INTO " + TABLE_SQ_COUNTER_SUBMISSION + " ("
+    + COLUMN_SQRS_GROUP + ", "
+    + COLUMN_SQRS_COUNTER + ", "
+    + COLUMN_SQRS_SUBMISSION + ", "
+    + COLUMN_SQRS_VALUE + ") "
+    + "VALUES (?, ?, ?, ?)";
+
+  // DML: Select counter submission
+  public static final String STMT_SELECT_COUNTER_SUBMISSION =
+    "SELECT "
+    + COLUMN_SQG_NAME + ", "
+    + COLUMN_SQR_NAME + ", "
+    + COLUMN_SQRS_VALUE + " "
+    + "FROM " + TABLE_SQ_COUNTER_SUBMISSION + " "
+    + "LEFT JOIN " + TABLE_SQ_COUNTER_GROUP + " ON " + COLUMN_SQRS_GROUP + " = " + COLUMN_SQG_ID + " "
+    + "LEFT JOIN " + TABLE_SQ_COUNTER + " ON " + COLUMN_SQRS_COUNTER + " = " + COLUMN_SQR_ID + " "
+    + "WHERE " + COLUMN_SQRS_SUBMISSION + " = ? ";
+
+  // DML: Delete rows from counter submission table
+  public static final String STMT_DELETE_COUNTER_SUBMISSION =
+    "DELETE FROM " + TABLE_SQ_COUNTER_SUBMISSION
+    + " WHERE " + COLUMN_SQRS_SUBMISSION + " = ?";
+
 
   private DerbySchemaQuery() {
     // Disable explicit object creation

http://git-wip-us.apache.org/repos/asf/sqoop/blob/80a11f97/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/DerbyTestCase.java
----------------------------------------------------------------------
diff --git a/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/DerbyTestCase.java b/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/DerbyTestCase.java
index 7aa362e..0efa19d 100644
--- a/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/DerbyTestCase.java
+++ b/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/DerbyTestCase.java
@@ -95,6 +95,9 @@ abstract public class DerbyTestCase extends TestCase {
     runQuery(QUERY_CREATE_TABLE_SQ_CONNECTION_INPUT);
     runQuery(QUERY_CREATE_TABLE_SQ_JOB_INPUT);
     runQuery(QUERY_CREATE_TABLE_SQ_SUBMISSION);
+    runQuery(QUERY_CREATE_TABLE_SQ_COUNTER_GROUP);
+    runQuery(QUERY_CREATE_TABLE_SQ_COUNTER);
+    runQuery(QUERY_CREATE_TABLE_SQ_COUNTER_SUBMISSION);
   }
 
   /**
@@ -253,14 +256,41 @@ abstract public class DerbyTestCase extends TestCase {
    * @throws Exception
    */
   public void loadSubmissions() throws  Exception {
+    runQuery("INSERT INTO SQOOP.SQ_COUNTER_GROUP "
+      + "(SQG_NAME) "
+      + "VALUES"
+      + "('gA'), ('gB')"
+    );
+
+    runQuery("INSERT INTO SQOOP.SQ_COUNTER "
+      + "(SQR_NAME) "
+      + "VALUES"
+      + "('cA'), ('cB')"
+    );
+
     runQuery("INSERT INTO SQOOP.SQ_SUBMISSION"
-      + "(SQS_JOB, SQS_STATUS, SQS_DATE, SQS_EXTERNAL_ID) VALUES "
-      + "(1, 'RUNNING', '2012-01-01 01:01:01', 'job_1'),"
-      + "(2, 'SUCCEEDED', '2012-01-02 01:01:01', 'job_2'),"
-      + "(3, 'FAILED', '2012-01-03 01:01:01', 'job_3'),"
-      + "(4, 'UNKNOWN', '2012-01-04 01:01:01', 'job_4'),"
-      + "(1, 'RUNNING', '2012-01-05 01:01:01', 'job_5')"
+      + "(SQS_JOB, SQS_STATUS, SQS_CREATION_DATE, SQS_UPDATE_DATE,"
+      + " SQS_EXTERNAL_ID, SQS_EXTERNAL_LINK, SQS_EXCEPTION,"
+      + " SQS_EXCEPTION_TRACE)"
+      + "VALUES "
+      + "(1, 'RUNNING', '2012-01-01 01:01:01', '2012-01-01 01:01:01', 'job_1',"
+      +   "NULL, NULL, NULL),"
+      + "(2, 'SUCCEEDED', '2012-01-01 01:01:01', '2012-01-02 01:01:01', 'job_2',"
+      + " NULL, NULL, NULL),"
+      + "(3, 'FAILED', '2012-01-01 01:01:01', '2012-01-03 01:01:01', 'job_3',"
+      + " NULL, NULL, NULL),"
+      + "(4, 'UNKNOWN', '2012-01-01 01:01:01', '2012-01-04 01:01:01', 'job_4',"
+      + " NULL, NULL, NULL),"
+      + "(1, 'RUNNING', '2012-01-01 01:01:01', '2012-01-05 01:01:01', 'job_5',"
+      + " NULL, NULL, NULL)"
     );
+
+    runQuery("INSERT INTO SQOOP.SQ_COUNTER_SUBMISSION "
+      + "(SQRS_GROUP, SQRS_COUNTER, SQRS_SUBMISSION, SQRS_VALUE) "
+      + "VALUES"
+      + "(1, 1, 4, 300)"
+    );
+
   }
 
   protected MConnector getConnector() {

http://git-wip-us.apache.org/repos/asf/sqoop/blob/80a11f97/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestSubmissionHandling.java
----------------------------------------------------------------------
diff --git a/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestSubmissionHandling.java b/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestSubmissionHandling.java
index 3433b20..8fce0dd 100644
--- a/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestSubmissionHandling.java
+++ b/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestSubmissionHandling.java
@@ -19,6 +19,9 @@ package org.apache.sqoop.repository.derby;
 
 import org.apache.sqoop.model.MSubmission;
 import org.apache.sqoop.submission.SubmissionStatus;
+import org.apache.sqoop.submission.counter.Counter;
+import org.apache.sqoop.submission.counter.CounterGroup;
+import org.apache.sqoop.submission.counter.Counters;
 
 import java.util.Calendar;
 import java.util.Date;
@@ -27,7 +30,7 @@ import java.util.List;
 /**
  *
  */
-public class TestSubmissionHandling extends  DerbyTestCase {
+public class TestSubmissionHandling extends DerbyTestCase {
 
   DerbyRepositoryHandler handler;
 
@@ -84,8 +87,29 @@ public class TestSubmissionHandling extends  DerbyTestCase {
   }
 
   public void testCreateSubmission() throws Exception {
-    MSubmission submission =
-      new MSubmission(1, new Date(), SubmissionStatus.RUNNING, "job-x");
+    Date creationDate = new Date();
+    Date updateDate = new Date();
+
+    CounterGroup firstGroup = new CounterGroup("ga");
+    CounterGroup secondGroup = new CounterGroup("gb");
+    firstGroup.addCounter(new Counter("ca", 100));
+    firstGroup.addCounter(new Counter("cb", 200));
+    secondGroup.addCounter(new Counter("ca", 300));
+    secondGroup.addCounter(new Counter("cd", 400));
+    Counters counters = new Counters();
+    counters.addCounterGroup(firstGroup);
+    counters.addCounterGroup(secondGroup);
+
+    MSubmission submission = new MSubmission();
+    submission.setJobId(1);
+    submission.setStatus(SubmissionStatus.RUNNING);
+    submission.setCreationDate(creationDate);
+    submission.setLastUpdateDate(updateDate);
+    submission.setExternalId("job-x");
+    submission.setExternalLink("http://somewhere");
+    submission.setExceptionInfo("RuntimeException");
+    submission.setExceptionStackTrace("Yeah it happens");
+    submission.setCounters(counters);
 
     handler.createSubmission(submission, getDerbyConnection());
 
@@ -101,9 +125,41 @@ public class TestSubmissionHandling extends  DerbyTestCase {
 
     assertEquals(1, submission.getJobId());
     assertEquals(SubmissionStatus.RUNNING, submission.getStatus());
+    assertEquals(creationDate, submission.getCreationDate());
+    assertEquals(updateDate, submission.getLastUpdateDate());
     assertEquals("job-x", submission.getExternalId());
+    assertEquals("http://somewhere", submission.getExternalLink());
+    assertEquals("RuntimeException", submission.getExceptionInfo());
+    assertEquals("Yeah it happens", submission.getExceptionStackTrace());
 
-    // Let's create second connection
+    CounterGroup group;
+    Counter counter;
+    Counters retrievedCounters = submission.getCounters();
+    assertNotNull(retrievedCounters);
+
+    group = counters.getCounterGroup("ga");
+    assertNotNull(group);
+
+    counter = group.getCounter("ca");
+    assertNotNull(counter);
+    assertEquals(100, counter.getValue());
+
+    counter = group.getCounter("cb");
+    assertNotNull(counter);
+    assertEquals(200, counter.getValue());
+
+    group = counters.getCounterGroup("gb");
+    assertNotNull(group);
+
+    counter = group.getCounter("ca");
+    assertNotNull(counter);
+    assertEquals(300, counter.getValue());
+
+    counter = group.getCounter("cd");
+    assertNotNull(counter);
+    assertEquals(400, counter.getValue());
+
+    // Let's create second (simpler) connection
     submission =
       new MSubmission(1, new Date(), SubmissionStatus.SUCCEEDED, "job-x");
     handler.createSubmission(submission, getDerbyConnection());
@@ -163,4 +219,27 @@ public class TestSubmissionHandling extends  DerbyTestCase {
     assertEquals(0, submissions.size());
     assertCountForTable("SQOOP.SQ_SUBMISSION", 0);
   }
+
+  /**
+   * Test that by directly removing jobs we will also remove associated
+   * submissions and counters.
+   *
+   * @throws Exception
+   */
+  public void testDeleteJobs() throws Exception {
+    loadSubmissions();
+    assertCountForTable("SQOOP.SQ_SUBMISSION", 5);
+
+    handler.deleteJob(1, getDerbyConnection());
+    assertCountForTable("SQOOP.SQ_SUBMISSION", 3);
+
+    handler.deleteJob(2, getDerbyConnection());
+    assertCountForTable("SQOOP.SQ_SUBMISSION", 2);
+
+    handler.deleteJob(3, getDerbyConnection());
+    assertCountForTable("SQOOP.SQ_SUBMISSION", 1);
+
+    handler.deleteJob(4, getDerbyConnection());
+    assertCountForTable("SQOOP.SQ_SUBMISSION", 0);
+  }
 }