You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2018/01/30 01:09:14 UTC

incubator-gobblin git commit: [GOBBLIN-392] Load all dataset states when getLatestDatasetStatesByUrns() is called

Repository: incubator-gobblin
Updated Branches:
  refs/heads/master 378ccaa8a -> c35f76e4e


[GOBBLIN-392] Load all dataset states when getLatestDatasetStatesByUrns() is called

Closes #2268 from htran1/fix_dataset_state_fetch


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/c35f76e4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/c35f76e4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/c35f76e4

Branch: refs/heads/master
Commit: c35f76e4e2a0f9f25580924d46cef1b732af7d63
Parents: 378ccaa
Author: Hung Tran <hu...@linkedin.com>
Authored: Mon Jan 29 17:08:46 2018 -0800
Committer: Hung Tran <hu...@linkedin.com>
Committed: Mon Jan 29 17:08:46 2018 -0800

----------------------------------------------------------------------
 .../gobblin/runtime/ZkDatasetStateStore.java     |  4 ++--
 .../gobblin/runtime/ZkDatasetStateStoreTest.java | 19 ++++++++++++++++++-
 .../gobblin/runtime/MysqlDatasetStateStore.java  |  4 ++--
 .../runtime/MysqlDatasetStateStoreTest.java      | 19 ++++++++++++++++++-
 4 files changed, 40 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c35f76e4/gobblin-modules/gobblin-helix/src/main/java/org/apache/gobblin/runtime/ZkDatasetStateStore.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-helix/src/main/java/org/apache/gobblin/runtime/ZkDatasetStateStore.java b/gobblin-modules/gobblin-helix/src/main/java/org/apache/gobblin/runtime/ZkDatasetStateStore.java
index dbde3fc..e9ecf35 100644
--- a/gobblin-modules/gobblin-helix/src/main/java/org/apache/gobblin/runtime/ZkDatasetStateStore.java
+++ b/gobblin-modules/gobblin-helix/src/main/java/org/apache/gobblin/runtime/ZkDatasetStateStore.java
@@ -65,8 +65,8 @@ public class ZkDatasetStateStore extends ZkStateStore<JobState.DatasetState>
       }});
 
     Map<String, JobState.DatasetState> datasetStatesByUrns = Maps.newHashMap();
-    if (!previousDatasetStates.isEmpty()) {
-      JobState.DatasetState previousDatasetState = previousDatasetStates.get(0);
+
+    for (JobState.DatasetState previousDatasetState : previousDatasetStates) {
       datasetStatesByUrns.put(previousDatasetState.getDatasetUrn(), previousDatasetState);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c35f76e4/gobblin-modules/gobblin-helix/src/test/java/org/apache/gobblin/runtime/ZkDatasetStateStoreTest.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-helix/src/test/java/org/apache/gobblin/runtime/ZkDatasetStateStoreTest.java b/gobblin-modules/gobblin-helix/src/test/java/org/apache/gobblin/runtime/ZkDatasetStateStoreTest.java
index 742aa98..1091cf7 100644
--- a/gobblin-modules/gobblin-helix/src/test/java/org/apache/gobblin/runtime/ZkDatasetStateStoreTest.java
+++ b/gobblin-modules/gobblin-helix/src/test/java/org/apache/gobblin/runtime/ZkDatasetStateStoreTest.java
@@ -44,6 +44,7 @@ public class ZkDatasetStateStoreTest {
   private static final String TEST_JOB_ID = "TestJob1";
   private static final String TEST_TASK_ID_PREFIX = "TestTask-";
   private static final String TEST_DATASET_URN = "TestDataset";
+  private static final String TEST_DATASET_URN2 = "TestDataset2";
 
   private TestingServer testingServer;
   private StateStore<JobState> zkJobStateStore;
@@ -142,6 +143,13 @@ public class ZkDatasetStateStoreTest {
     }
 
     zkDatasetStateStore.persistDatasetState(TEST_DATASET_URN, datasetState);
+
+    // persist a second dataset state to test that retrieval of multiple dataset states works
+    datasetState.setDatasetUrn(TEST_DATASET_URN2);
+    datasetState.setId(TEST_DATASET_URN2);
+    datasetState.setDuration(2000);
+
+    zkDatasetStateStore.persistDatasetState(TEST_DATASET_URN2, datasetState);
   }
 
   @Test(dependsOnMethods = "testPersistDatasetState")
@@ -171,7 +179,7 @@ public class ZkDatasetStateStoreTest {
   public void testGetPreviousDatasetStatesByUrns() throws IOException {
     Map<String, JobState.DatasetState> datasetStatesByUrns =
         zkDatasetStateStore.getLatestDatasetStatesByUrns(TEST_JOB_NAME);
-    Assert.assertEquals(datasetStatesByUrns.size(), 1);
+    Assert.assertEquals(datasetStatesByUrns.size(), 2);
 
     JobState.DatasetState datasetState = datasetStatesByUrns.get(TEST_DATASET_URN);
     Assert.assertEquals(datasetState.getDatasetUrn(), TEST_DATASET_URN);
@@ -181,6 +189,15 @@ public class ZkDatasetStateStoreTest {
     Assert.assertEquals(datasetState.getStartTime(), this.startTime);
     Assert.assertEquals(datasetState.getEndTime(), this.startTime + 1000);
     Assert.assertEquals(datasetState.getDuration(), 1000);
+
+    datasetState = datasetStatesByUrns.get(TEST_DATASET_URN2);
+    Assert.assertEquals(datasetState.getDatasetUrn(), TEST_DATASET_URN2);
+    Assert.assertEquals(datasetState.getJobName(), TEST_JOB_NAME);
+    Assert.assertEquals(datasetState.getJobId(), TEST_JOB_ID);
+    Assert.assertEquals(datasetState.getState(), JobState.RunningState.COMMITTED);
+    Assert.assertEquals(datasetState.getStartTime(), this.startTime);
+    Assert.assertEquals(datasetState.getEndTime(), this.startTime + 1000);
+    Assert.assertEquals(datasetState.getDuration(), 2000);
   }
 
   @Test(dependsOnMethods = "testGetPreviousDatasetStatesByUrns")

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c35f76e4/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/MysqlDatasetStateStore.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/MysqlDatasetStateStore.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/MysqlDatasetStateStore.java
index 400e52c..741ac07 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/MysqlDatasetStateStore.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/MysqlDatasetStateStore.java
@@ -69,8 +69,8 @@ public class MysqlDatasetStateStore extends MysqlStateStore<JobState.DatasetStat
         getAll(jobName, "%" + CURRENT_DATASET_STATE_FILE_SUFFIX + DATASET_STATE_STORE_TABLE_SUFFIX, true);
 
     Map<String, JobState.DatasetState> datasetStatesByUrns = Maps.newHashMap();
-    if (!previousDatasetStates.isEmpty()) {
-      JobState.DatasetState previousDatasetState = previousDatasetStates.get(0);
+
+    for (JobState.DatasetState previousDatasetState : previousDatasetStates) {
       datasetStatesByUrns.put(previousDatasetState.getDatasetUrn(), previousDatasetState);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c35f76e4/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/MysqlDatasetStateStoreTest.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/MysqlDatasetStateStoreTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/MysqlDatasetStateStoreTest.java
index 9c35610..86ba8ba 100644
--- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/MysqlDatasetStateStoreTest.java
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/MysqlDatasetStateStoreTest.java
@@ -46,6 +46,7 @@ public class MysqlDatasetStateStoreTest {
   private static final String TEST_JOB_ID = "TestJob1";
   private static final String TEST_TASK_ID_PREFIX = "TestTask-";
   private static final String TEST_DATASET_URN = "TestDataset";
+  private static final String TEST_DATASET_URN2 = "TestDataset2";
 
   private StateStore<JobState> dbJobStateStore;
   private DatasetStateStore<JobState.DatasetState> dbDatasetStateStore;
@@ -154,6 +155,13 @@ public class MysqlDatasetStateStoreTest {
     }
 
     dbDatasetStateStore.persistDatasetState(TEST_DATASET_URN, datasetState);
+
+    // persist a second dataset state to test that retrieval of multiple dataset states works
+    datasetState.setDatasetUrn(TEST_DATASET_URN2);
+    datasetState.setId(TEST_DATASET_URN2);
+    datasetState.setDuration(2000);
+
+    dbDatasetStateStore.persistDatasetState(TEST_DATASET_URN2, datasetState);
   }
 
   @Test(dependsOnMethods = "testPersistDatasetState")
@@ -183,7 +191,7 @@ public class MysqlDatasetStateStoreTest {
   public void testGetPreviousDatasetStatesByUrns() throws IOException {
     Map<String, JobState.DatasetState> datasetStatesByUrns =
         dbDatasetStateStore.getLatestDatasetStatesByUrns(TEST_JOB_NAME);
-    Assert.assertEquals(datasetStatesByUrns.size(), 1);
+    Assert.assertEquals(datasetStatesByUrns.size(), 2);
 
     JobState.DatasetState datasetState = datasetStatesByUrns.get(TEST_DATASET_URN);
     Assert.assertEquals(datasetState.getDatasetUrn(), TEST_DATASET_URN);
@@ -193,6 +201,15 @@ public class MysqlDatasetStateStoreTest {
     Assert.assertEquals(datasetState.getStartTime(), this.startTime);
     Assert.assertEquals(datasetState.getEndTime(), this.startTime + 1000);
     Assert.assertEquals(datasetState.getDuration(), 1000);
+
+    datasetState = datasetStatesByUrns.get(TEST_DATASET_URN2);
+    Assert.assertEquals(datasetState.getDatasetUrn(), TEST_DATASET_URN2);
+    Assert.assertEquals(datasetState.getJobName(), TEST_JOB_NAME);
+    Assert.assertEquals(datasetState.getJobId(), TEST_JOB_ID);
+    Assert.assertEquals(datasetState.getState(), JobState.RunningState.COMMITTED);
+    Assert.assertEquals(datasetState.getStartTime(), this.startTime);
+    Assert.assertEquals(datasetState.getEndTime(), this.startTime + 1000);
+    Assert.assertEquals(datasetState.getDuration(), 2000);
   }
 
   @Test(dependsOnMethods = "testGetPreviousDatasetStatesByUrns")