You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2018/01/30 01:09:14 UTC
incubator-gobblin git commit: [GOBBLIN-392] Load all dataset states
when getLatestDatasetStatesByUrns() is called
Repository: incubator-gobblin
Updated Branches:
refs/heads/master 378ccaa8a -> c35f76e4e
[GOBBLIN-392] Load all dataset states when getLatestDatasetStatesByUrns() is called
Closes #2268 from htran1/fix_dataset_state_fetch
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/c35f76e4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/c35f76e4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/c35f76e4
Branch: refs/heads/master
Commit: c35f76e4e2a0f9f25580924d46cef1b732af7d63
Parents: 378ccaa
Author: Hung Tran <hu...@linkedin.com>
Authored: Mon Jan 29 17:08:46 2018 -0800
Committer: Hung Tran <hu...@linkedin.com>
Committed: Mon Jan 29 17:08:46 2018 -0800
----------------------------------------------------------------------
.../gobblin/runtime/ZkDatasetStateStore.java | 4 ++--
.../gobblin/runtime/ZkDatasetStateStoreTest.java | 19 ++++++++++++++++++-
.../gobblin/runtime/MysqlDatasetStateStore.java | 4 ++--
.../runtime/MysqlDatasetStateStoreTest.java | 19 ++++++++++++++++++-
4 files changed, 40 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c35f76e4/gobblin-modules/gobblin-helix/src/main/java/org/apache/gobblin/runtime/ZkDatasetStateStore.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-helix/src/main/java/org/apache/gobblin/runtime/ZkDatasetStateStore.java b/gobblin-modules/gobblin-helix/src/main/java/org/apache/gobblin/runtime/ZkDatasetStateStore.java
index dbde3fc..e9ecf35 100644
--- a/gobblin-modules/gobblin-helix/src/main/java/org/apache/gobblin/runtime/ZkDatasetStateStore.java
+++ b/gobblin-modules/gobblin-helix/src/main/java/org/apache/gobblin/runtime/ZkDatasetStateStore.java
@@ -65,8 +65,8 @@ public class ZkDatasetStateStore extends ZkStateStore<JobState.DatasetState>
}});
Map<String, JobState.DatasetState> datasetStatesByUrns = Maps.newHashMap();
- if (!previousDatasetStates.isEmpty()) {
- JobState.DatasetState previousDatasetState = previousDatasetStates.get(0);
+
+ for (JobState.DatasetState previousDatasetState : previousDatasetStates) {
datasetStatesByUrns.put(previousDatasetState.getDatasetUrn(), previousDatasetState);
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c35f76e4/gobblin-modules/gobblin-helix/src/test/java/org/apache/gobblin/runtime/ZkDatasetStateStoreTest.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-helix/src/test/java/org/apache/gobblin/runtime/ZkDatasetStateStoreTest.java b/gobblin-modules/gobblin-helix/src/test/java/org/apache/gobblin/runtime/ZkDatasetStateStoreTest.java
index 742aa98..1091cf7 100644
--- a/gobblin-modules/gobblin-helix/src/test/java/org/apache/gobblin/runtime/ZkDatasetStateStoreTest.java
+++ b/gobblin-modules/gobblin-helix/src/test/java/org/apache/gobblin/runtime/ZkDatasetStateStoreTest.java
@@ -44,6 +44,7 @@ public class ZkDatasetStateStoreTest {
private static final String TEST_JOB_ID = "TestJob1";
private static final String TEST_TASK_ID_PREFIX = "TestTask-";
private static final String TEST_DATASET_URN = "TestDataset";
+ private static final String TEST_DATASET_URN2 = "TestDataset2";
private TestingServer testingServer;
private StateStore<JobState> zkJobStateStore;
@@ -142,6 +143,13 @@ public class ZkDatasetStateStoreTest {
}
zkDatasetStateStore.persistDatasetState(TEST_DATASET_URN, datasetState);
+
+ // persist a second dataset state to test that retrieval of multiple dataset states works
+ datasetState.setDatasetUrn(TEST_DATASET_URN2);
+ datasetState.setId(TEST_DATASET_URN2);
+ datasetState.setDuration(2000);
+
+ zkDatasetStateStore.persistDatasetState(TEST_DATASET_URN2, datasetState);
}
@Test(dependsOnMethods = "testPersistDatasetState")
@@ -171,7 +179,7 @@ public class ZkDatasetStateStoreTest {
public void testGetPreviousDatasetStatesByUrns() throws IOException {
Map<String, JobState.DatasetState> datasetStatesByUrns =
zkDatasetStateStore.getLatestDatasetStatesByUrns(TEST_JOB_NAME);
- Assert.assertEquals(datasetStatesByUrns.size(), 1);
+ Assert.assertEquals(datasetStatesByUrns.size(), 2);
JobState.DatasetState datasetState = datasetStatesByUrns.get(TEST_DATASET_URN);
Assert.assertEquals(datasetState.getDatasetUrn(), TEST_DATASET_URN);
@@ -181,6 +189,15 @@ public class ZkDatasetStateStoreTest {
Assert.assertEquals(datasetState.getStartTime(), this.startTime);
Assert.assertEquals(datasetState.getEndTime(), this.startTime + 1000);
Assert.assertEquals(datasetState.getDuration(), 1000);
+
+ datasetState = datasetStatesByUrns.get(TEST_DATASET_URN2);
+ Assert.assertEquals(datasetState.getDatasetUrn(), TEST_DATASET_URN2);
+ Assert.assertEquals(datasetState.getJobName(), TEST_JOB_NAME);
+ Assert.assertEquals(datasetState.getJobId(), TEST_JOB_ID);
+ Assert.assertEquals(datasetState.getState(), JobState.RunningState.COMMITTED);
+ Assert.assertEquals(datasetState.getStartTime(), this.startTime);
+ Assert.assertEquals(datasetState.getEndTime(), this.startTime + 1000);
+ Assert.assertEquals(datasetState.getDuration(), 2000);
}
@Test(dependsOnMethods = "testGetPreviousDatasetStatesByUrns")
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c35f76e4/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/MysqlDatasetStateStore.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/MysqlDatasetStateStore.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/MysqlDatasetStateStore.java
index 400e52c..741ac07 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/MysqlDatasetStateStore.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/MysqlDatasetStateStore.java
@@ -69,8 +69,8 @@ public class MysqlDatasetStateStore extends MysqlStateStore<JobState.DatasetStat
getAll(jobName, "%" + CURRENT_DATASET_STATE_FILE_SUFFIX + DATASET_STATE_STORE_TABLE_SUFFIX, true);
Map<String, JobState.DatasetState> datasetStatesByUrns = Maps.newHashMap();
- if (!previousDatasetStates.isEmpty()) {
- JobState.DatasetState previousDatasetState = previousDatasetStates.get(0);
+
+ for (JobState.DatasetState previousDatasetState : previousDatasetStates) {
datasetStatesByUrns.put(previousDatasetState.getDatasetUrn(), previousDatasetState);
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c35f76e4/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/MysqlDatasetStateStoreTest.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/MysqlDatasetStateStoreTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/MysqlDatasetStateStoreTest.java
index 9c35610..86ba8ba 100644
--- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/MysqlDatasetStateStoreTest.java
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/MysqlDatasetStateStoreTest.java
@@ -46,6 +46,7 @@ public class MysqlDatasetStateStoreTest {
private static final String TEST_JOB_ID = "TestJob1";
private static final String TEST_TASK_ID_PREFIX = "TestTask-";
private static final String TEST_DATASET_URN = "TestDataset";
+ private static final String TEST_DATASET_URN2 = "TestDataset2";
private StateStore<JobState> dbJobStateStore;
private DatasetStateStore<JobState.DatasetState> dbDatasetStateStore;
@@ -154,6 +155,13 @@ public class MysqlDatasetStateStoreTest {
}
dbDatasetStateStore.persistDatasetState(TEST_DATASET_URN, datasetState);
+
+ // persist a second dataset state to test that retrieval of multiple dataset states works
+ datasetState.setDatasetUrn(TEST_DATASET_URN2);
+ datasetState.setId(TEST_DATASET_URN2);
+ datasetState.setDuration(2000);
+
+ dbDatasetStateStore.persistDatasetState(TEST_DATASET_URN2, datasetState);
}
@Test(dependsOnMethods = "testPersistDatasetState")
@@ -183,7 +191,7 @@ public class MysqlDatasetStateStoreTest {
public void testGetPreviousDatasetStatesByUrns() throws IOException {
Map<String, JobState.DatasetState> datasetStatesByUrns =
dbDatasetStateStore.getLatestDatasetStatesByUrns(TEST_JOB_NAME);
- Assert.assertEquals(datasetStatesByUrns.size(), 1);
+ Assert.assertEquals(datasetStatesByUrns.size(), 2);
JobState.DatasetState datasetState = datasetStatesByUrns.get(TEST_DATASET_URN);
Assert.assertEquals(datasetState.getDatasetUrn(), TEST_DATASET_URN);
@@ -193,6 +201,15 @@ public class MysqlDatasetStateStoreTest {
Assert.assertEquals(datasetState.getStartTime(), this.startTime);
Assert.assertEquals(datasetState.getEndTime(), this.startTime + 1000);
Assert.assertEquals(datasetState.getDuration(), 1000);
+
+ datasetState = datasetStatesByUrns.get(TEST_DATASET_URN2);
+ Assert.assertEquals(datasetState.getDatasetUrn(), TEST_DATASET_URN2);
+ Assert.assertEquals(datasetState.getJobName(), TEST_JOB_NAME);
+ Assert.assertEquals(datasetState.getJobId(), TEST_JOB_ID);
+ Assert.assertEquals(datasetState.getState(), JobState.RunningState.COMMITTED);
+ Assert.assertEquals(datasetState.getStartTime(), this.startTime);
+ Assert.assertEquals(datasetState.getEndTime(), this.startTime + 1000);
+ Assert.assertEquals(datasetState.getDuration(), 2000);
}
@Test(dependsOnMethods = "testGetPreviousDatasetStatesByUrns")