You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by ib...@apache.org on 2017/08/14 17:35:56 UTC

incubator-gobblin git commit: [GOBBLIN-199] API for listing the contents of a state store

Repository: incubator-gobblin
Updated Branches:
  refs/heads/master ee91502e6 -> 998fe200d


[GOBBLIN-199] API for listing the contents of a state store

Closes #2051 from ibuenros/state-store-listapi


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/998fe200
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/998fe200
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/998fe200

Branch: refs/heads/master
Commit: 998fe200de1cb74e6797f119752206687a74dcbb
Parents: ee91502
Author: ibuenros <is...@gmail.com>
Authored: Mon Aug 14 10:35:41 2017 -0700
Committer: Issac Buenrostro <ib...@apache.org>
Committed: Mon Aug 14 10:35:41 2017 -0700

----------------------------------------------------------------------
 .../apache/gobblin/dataset/DatasetsFinder.java  |  1 +
 .../gobblin/metastore/DatasetStateStore.java    | 64 ++++++++++++++
 .../gobblin/metastore/DatasetStoreDataset.java  | 56 ++++++++++++
 .../metastore/DatasetStoreDatasetFinder.java    | 93 ++++++++++++++++++++
 .../apache/gobblin/metastore/StateStore.java    | 19 ++++
 .../metadata/DatasetStateStoreEntryManager.java | 60 +++++++++++++
 .../metadata/StateStoreEntryManager.java        | 63 +++++++++++++
 .../metastore/predicates/DatasetPredicate.java  | 55 ++++++++++++
 .../predicates/StateStorePredicate.java         | 43 +++++++++
 .../predicates/StoreNamePredicate.java          | 44 +++++++++
 .../gobblin/runtime/FsDatasetStateStore.java    | 60 +++++++++++--
 .../FsDatasetStateStoreEntryManager.java        | 51 +++++++++++
 .../runtime/FsDatasetStateStoreTest.java        | 66 ++++++++++++++
 .../gobblin/runtime/JobLauncherTestHelper.java  | 14 +--
 .../runtime/commit/CommitSequenceTest.java      |  2 +-
 15 files changed, 679 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/998fe200/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetsFinder.java
----------------------------------------------------------------------
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetsFinder.java b/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetsFinder.java
index 27a4dae..44fea79 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetsFinder.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetsFinder.java
@@ -43,5 +43,6 @@ public interface DatasetsFinder<T extends Dataset> {
   /**
    * @return The deepest common root shared by all {@link Dataset}s root paths returned by this finder.
    */
+  @Deprecated
   public Path commonDatasetRoot();
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/998fe200/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/DatasetStateStore.java
----------------------------------------------------------------------
diff --git a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/DatasetStateStore.java b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/DatasetStateStore.java
index 16b3442..91d8db7 100644
--- a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/DatasetStateStore.java
+++ b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/DatasetStateStore.java
@@ -19,16 +19,30 @@ package org.apache.gobblin.metastore;
 
 import java.io.IOException;
 import java.util.Collection;
+import java.util.List;
 import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
+import com.google.common.base.Strings;
 import com.typesafe.config.Config;
 
+import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.metastore.metadata.DatasetStateStoreEntryManager;
+import org.apache.gobblin.metastore.predicates.StateStorePredicate;
+import org.apache.gobblin.util.ClassAliasResolver;
+import org.apache.gobblin.util.ConfigUtils;
+
+import lombok.Getter;
+
 
 public interface DatasetStateStore<T extends State> extends StateStore<T> {
   String DATASET_STATE_STORE_TABLE_SUFFIX = ".jst";
   String CURRENT_DATASET_STATE_FILE_SUFFIX = "current";
 
+  Pattern TABLE_NAME_PARSER_PATTERN = Pattern.compile("^(?:(.+)-)?([^-]+)\\.jst$");
+
   interface Factory {
     <T extends State> DatasetStateStore<T> createStateStore(Config config);
   }
@@ -40,4 +54,54 @@ public interface DatasetStateStore<T extends State> extends StateStore<T> {
   public void persistDatasetState(String datasetUrn, T datasetState) throws IOException;
 
   public void persistDatasetURNs(String storeName, Collection<String> datasetUrns) throws IOException;
+
+  @Override
+  default List<? extends DatasetStateStoreEntryManager> getMetadataForTables(StateStorePredicate predicate)
+      throws IOException {
+    throw new UnsupportedOperationException();
+  }
+
+  default String sanitizeDatasetStatestoreNameFromDatasetURN(String storeName, String datasetURN) throws IOException {
+    return datasetURN;
+  }
+
+  static String buildTableName(DatasetStateStore store, String storeName, String stateId, String datasetUrn) throws IOException {
+    return Strings.isNullOrEmpty(datasetUrn) ? stateId + DATASET_STATE_STORE_TABLE_SUFFIX
+        : store.sanitizeDatasetStatestoreNameFromDatasetURN(storeName,datasetUrn) + "-" + stateId + DATASET_STATE_STORE_TABLE_SUFFIX;
+  }
+
+  @Getter
+  class TableNameParser {
+    private final String sanitizedDatasetUrn;
+    private final String stateId;
+
+    public TableNameParser(String tableName) {
+      Matcher matcher = TABLE_NAME_PARSER_PATTERN.matcher(tableName);
+      if (matcher.matches()) {
+        this.sanitizedDatasetUrn = matcher.group(1);
+        this.stateId = matcher.group(2);
+      } else {
+        throw new IllegalArgumentException("Cannot parse table name " + tableName);
+      }
+    }
+  }
+
+  static DatasetStateStore buildDatasetStateStore(Config config) throws IOException {
+    ClassAliasResolver<Factory> resolver =
+        new ClassAliasResolver<>(DatasetStateStore.Factory.class);
+
+    String stateStoreType = ConfigUtils.getString(config, ConfigurationKeys.STATE_STORE_TYPE_KEY,
+        ConfigurationKeys.DEFAULT_STATE_STORE_TYPE);
+
+    try {
+      DatasetStateStore.Factory stateStoreFactory =
+          resolver.resolveClass(stateStoreType).newInstance();
+
+      return stateStoreFactory.createStateStore(config);
+    } catch (RuntimeException e) {
+      throw e;
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/998fe200/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/DatasetStoreDataset.java
----------------------------------------------------------------------
diff --git a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/DatasetStoreDataset.java b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/DatasetStoreDataset.java
new file mode 100644
index 0000000..666cb56
--- /dev/null
+++ b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/DatasetStoreDataset.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metastore;
+
+import java.util.List;
+
+import org.apache.gobblin.dataset.Dataset;
+import org.apache.gobblin.metastore.metadata.DatasetStateStoreEntryManager;
+
+import lombok.Data;
+
+
+/**
+ * A {@link Dataset} representing a group of entries in a {@link DatasetStateStore} with the same dataset urn.
+ */
+@Data
+public class DatasetStoreDataset implements Dataset {
+
+  private final Key key;
+  private final List<DatasetStateStoreEntryManager> datasetStateStoreMetadataEntries;
+
+  @Override
+  public String datasetURN() {
+    return this.key.getStoreName() + ":::" + this.key.getSanitizedDatasetUrn();
+  }
+
+  /**
+   * The key for a {@link DatasetStoreDataset}.
+   */
+  @Data
+  public static class Key {
+    private final String storeName;
+    private final String sanitizedDatasetUrn;
+
+    public Key(DatasetStateStoreEntryManager metadata) {
+      this.storeName = metadata.getStoreName();
+      this.sanitizedDatasetUrn = metadata.getSanitizedDatasetUrn();
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/998fe200/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/DatasetStoreDatasetFinder.java
----------------------------------------------------------------------
diff --git a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/DatasetStoreDatasetFinder.java b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/DatasetStoreDatasetFinder.java
new file mode 100644
index 0000000..75d083d
--- /dev/null
+++ b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/DatasetStoreDatasetFinder.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metastore;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.stream.Collectors;
+
+import org.apache.gobblin.dataset.DatasetsFinder;
+import org.apache.gobblin.metastore.metadata.DatasetStateStoreEntryManager;
+import org.apache.gobblin.metastore.predicates.DatasetPredicate;
+import org.apache.gobblin.metastore.predicates.StateStorePredicate;
+import org.apache.gobblin.metastore.predicates.StoreNamePredicate;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+
+/**
+ * A {@link DatasetsFinder} to find {@link DatasetStoreDataset}s.
+ */
+public class DatasetStoreDatasetFinder implements DatasetsFinder<DatasetStoreDataset> {
+
+  public static final String STORE_NAME_FILTER = "datasetStoreDatasetFinder.filter.storeName";
+  public static final String DATASET_URN_FILTER = "datasetStoreDatasetFinder.filter.datasetUrn";
+
+  private final Config config;
+  private final DatasetStateStore store;
+  private final StateStorePredicate predicate;
+
+  public DatasetStoreDatasetFinder(FileSystem fs, Properties props) throws IOException {
+    this.config = ConfigFactory.parseProperties(props);
+    this.store = DatasetStateStore.buildDatasetStateStore(this.config);
+    this.predicate = buildPredicate();
+  }
+
+  private StateStorePredicate buildPredicate() {
+    StateStorePredicate predicate= null;
+    String storeName = null;
+    String datasetUrn;
+
+    if (ConfigUtils.hasNonEmptyPath(this.config, STORE_NAME_FILTER)) {
+      storeName = this.config.getString(STORE_NAME_FILTER);
+      predicate = new StoreNamePredicate(storeName, x -> true);
+    }
+
+    if (ConfigUtils.hasNonEmptyPath(this.config, DATASET_URN_FILTER)) {
+      if (storeName == null) {
+        throw new IllegalArgumentException(DATASET_URN_FILTER + " requires " + STORE_NAME_FILTER + " to also be defined.");
+      }
+      datasetUrn = this.config.getString(DATASET_URN_FILTER);
+      predicate = new DatasetPredicate(storeName, datasetUrn, x -> true);
+    }
+
+    return predicate == null ? new StateStorePredicate(x -> true) : predicate;
+  }
+
+  @Override
+  public List<DatasetStoreDataset> findDatasets() throws IOException {
+    List<DatasetStateStoreEntryManager> entries = this.store.getMetadataForTables(this.predicate);
+
+    Map<DatasetStoreDataset.Key, List<DatasetStateStoreEntryManager>> entriesGroupedByDataset =
+        entries.stream().collect(Collectors.groupingBy(DatasetStoreDataset.Key::new));
+
+    return entriesGroupedByDataset.entrySet().stream().
+        map(entry -> new DatasetStoreDataset(entry.getKey(), entry.getValue())).collect(Collectors.toList());
+  }
+
+  @Override
+  public Path commonDatasetRoot() {
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/998fe200/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/StateStore.java
----------------------------------------------------------------------
diff --git a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/StateStore.java b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/StateStore.java
index 578c94a..46c2aa8 100644
--- a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/StateStore.java
+++ b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/StateStore.java
@@ -24,6 +24,8 @@ import java.util.Collection;
 import java.util.List;
 
 import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.metastore.metadata.StateStoreEntryManager;
+import org.apache.gobblin.metastore.predicates.StateStorePredicate;
 
 
 /**
@@ -35,6 +37,11 @@ import org.apache.gobblin.configuration.State;
  *     {@link State#getId()}).
  * </p>
  *
+ * <p>
+ *   Note: Implementations of dataset store should maintain a timestamp for every state they persist. Certain utilities
+ *   will not work if this is not the case.
+ * </p>
+ *
  * @param <T> state object type
  *
  * @author Yinan Li
@@ -193,4 +200,16 @@ public interface StateStore<T extends State> {
    */
   public void delete(String storeName)
       throws IOException;
+
+  /**
+   * Gets metadata for all tables matching the input
+   * @param predicate Predicate used to filter tables. To allow state stores to push down predicates, use native extensions
+   *                  of {@link StateStorePredicate}.
+   * @return A list of all {@link StateStoreEntryManager}s matching the predicate.
+   * @throws IOException
+   */
+  default List<? extends StateStoreEntryManager> getMetadataForTables(StateStorePredicate predicate)
+      throws IOException {
+    throw new UnsupportedOperationException("Operation unsupported for predicate with class " + predicate.getClass());
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/998fe200/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/metadata/DatasetStateStoreEntryManager.java
----------------------------------------------------------------------
diff --git a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/metadata/DatasetStateStoreEntryManager.java b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/metadata/DatasetStateStoreEntryManager.java
new file mode 100644
index 0000000..32bc851
--- /dev/null
+++ b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/metadata/DatasetStateStoreEntryManager.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metastore.metadata;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.metastore.DatasetStateStore;
+
+import lombok.Getter;
+
+
+/**
+ * A {@link StateStoreEntryManager} in a {@link DatasetStateStore}.
+ */
+@Getter
+public abstract class DatasetStateStoreEntryManager<T extends State> extends StateStoreEntryManager<T> {
+
+  /**
+   * The sanitized dataset urn. Sanitization usually involves a one-way function on the dataset urn, so the actual
+   * urn cannot be determined except by {@link #readState()}.
+   */
+  private final String sanitizedDatasetUrn;
+  /**
+   * An identifier for the state. Usually a job id or "current" for the latest state for that dataset.
+   */
+  private final String stateId;
+  private final DatasetStateStore datasetStateStore;
+
+  public DatasetStateStoreEntryManager(String storeName, String tableName, long timestamp,
+      DatasetStateStore.TableNameParser tableNameParser, DatasetStateStore datasetStateStore) {
+    this(storeName, tableName, timestamp, tableNameParser.getSanitizedDatasetUrn(), tableNameParser.getStateId(), datasetStateStore);
+  }
+
+  public DatasetStateStoreEntryManager(String storeName, String tableName, long timestamp, String sanitizedDatasetUrn,
+      String stateId, DatasetStateStore datasetStateStore) {
+    super(storeName, tableName, timestamp, datasetStateStore);
+    this.sanitizedDatasetUrn = sanitizedDatasetUrn;
+    this.stateId = stateId;
+    this.datasetStateStore = datasetStateStore;
+  }
+
+  @Override
+  public DatasetStateStore getStateStore() {
+    return this.datasetStateStore;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/998fe200/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/metadata/StateStoreEntryManager.java
----------------------------------------------------------------------
diff --git a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/metadata/StateStoreEntryManager.java b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/metadata/StateStoreEntryManager.java
new file mode 100644
index 0000000..b2fb04c
--- /dev/null
+++ b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/metadata/StateStoreEntryManager.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metastore.metadata;
+
+import java.io.IOException;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.metastore.StateStore;
+
+
+import lombok.Data;
+
+
+/**
+ * Contains metadata about an entry in a {@link StateStore}.
+ * @param <T> type of {@link State} that can be read from this entry.
+ */
+@Data
+public abstract class StateStoreEntryManager<T extends State> {
+
+  private final String storeName;
+  private final String tableName;
+  /** Timestamp at which the state was written. */
+  private final long timestamp;
+
+  /** {@link StateStore} where this entry exists. */
+  private final StateStore stateStore;
+
+  private final long getTimestamp() {
+    if (this.timestamp <= 0) {
+      throw new RuntimeException("Timestamp is not reliable.");
+    }
+    return this.timestamp;
+  }
+
+  /**
+   * @return The {@link State} contained in this entry. This operation should be lazy.
+   * @throws IOException
+   */
+  public abstract T readState() throws IOException;
+
+  /**
+   * Delete this entry in the {@link StateStore}.
+   * @throws IOException
+   */
+  public abstract void delete() throws IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/998fe200/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/predicates/DatasetPredicate.java
----------------------------------------------------------------------
diff --git a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/predicates/DatasetPredicate.java b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/predicates/DatasetPredicate.java
new file mode 100644
index 0000000..3e8cb62
--- /dev/null
+++ b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/predicates/DatasetPredicate.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metastore.predicates;
+
+import java.io.IOException;
+
+import org.apache.gobblin.metastore.metadata.DatasetStateStoreEntryManager;
+import org.apache.gobblin.metastore.metadata.StateStoreEntryManager;
+
+import com.google.common.base.Predicate;
+
+
+/**
+ * A {@link StateStorePredicate} used to select only entries from a {@link org.apache.gobblin.metastore.DatasetStateStore}
+ * with the provided dataset urn.
+ */
+public class DatasetPredicate extends StoreNamePredicate {
+
+  private final String datasetUrn;
+
+  public DatasetPredicate(String storeName, String datasetUrn, Predicate<StateStoreEntryManager> customPredicate) {
+    super(storeName, customPredicate);
+    this.datasetUrn = datasetUrn;
+  }
+
+  @Override
+  public boolean apply(StateStoreEntryManager input) {
+    if (!(input instanceof DatasetStateStoreEntryManager)) {
+      return false;
+    }
+    DatasetStateStoreEntryManager datasetStateStoreEntryMetadata = (DatasetStateStoreEntryManager) input;
+    try {
+      return super.apply(input) && datasetStateStoreEntryMetadata.getStateStore().
+          sanitizeDatasetStatestoreNameFromDatasetURN(getStoreName(), this.datasetUrn).
+          equals(((DatasetStateStoreEntryManager) input).getSanitizedDatasetUrn());
+    } catch (IOException ioe) {
+      throw new RuntimeException(ioe);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/998fe200/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/predicates/StateStorePredicate.java
----------------------------------------------------------------------
diff --git a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/predicates/StateStorePredicate.java b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/predicates/StateStorePredicate.java
new file mode 100644
index 0000000..789414c
--- /dev/null
+++ b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/predicates/StateStorePredicate.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metastore.predicates;
+
+import org.apache.gobblin.metastore.metadata.StateStoreEntryManager;
+
+import com.google.common.base.Predicate;
+
+import lombok.Data;
+import lombok.RequiredArgsConstructor;
+import lombok.experimental.Delegate;
+
+
+/**
+ * A {@link Predicate} used to filter entries in a {@link org.apache.gobblin.metastore.StateStore}.
+ *
+ * {@link org.apache.gobblin.metastore.StateStore}s can usually partially push down extensions of this class, so it
+ * is recommended to use bundled {@link StateStorePredicate} extensions as much as possible.
+ */
+@RequiredArgsConstructor
+public class StateStorePredicate implements Predicate<StateStoreEntryManager> {
+
+  /**
+   * An additional {@link Predicate} for filtering. This predicate is never pushed down.
+   */
+  @Delegate
+  private final Predicate<StateStoreEntryManager> customPredicate;
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/998fe200/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/predicates/StoreNamePredicate.java
----------------------------------------------------------------------
diff --git a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/predicates/StoreNamePredicate.java b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/predicates/StoreNamePredicate.java
new file mode 100644
index 0000000..2b1043d
--- /dev/null
+++ b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/predicates/StoreNamePredicate.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metastore.predicates;
+
+import org.apache.gobblin.metastore.metadata.StateStoreEntryManager;
+
+import com.google.common.base.Predicate;
+
+import lombok.Getter;
+
+
+/**
+ * A {@link StateStorePredicate} to select only entries with a specific {@link #storeName}.
+ */
+public class StoreNamePredicate extends StateStorePredicate {
+
+  @Getter
+  private final String storeName;
+
+  public StoreNamePredicate(String storeName, Predicate<StateStoreEntryManager> customPredicate) {
+    super(customPredicate);
+    this.storeName = storeName;
+  }
+
+  @Override
+  public boolean apply(StateStoreEntryManager input) {
+    return input.getStoreName().equals(this.storeName) && super.apply(input);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/998fe200/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/FsDatasetStateStore.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/FsDatasetStateStore.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/FsDatasetStateStore.java
index 0e99dae..fa35921 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/FsDatasetStateStore.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/FsDatasetStateStore.java
@@ -27,7 +27,13 @@ import java.util.Map;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
+import org.apache.gobblin.metastore.predicates.StateStorePredicate;
+import org.apache.gobblin.metastore.predicates.StoreNamePredicate;
+import org.apache.gobblin.runtime.metastore.filesystem.FsDatasetStateStoreEntryManager;
+import org.apache.gobblin.util.filters.HiddenFilter;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -158,7 +164,8 @@ public class FsDatasetStateStore extends FsStateStore<JobState.DatasetState> imp
     this.useTmpFileForPut = false;
   }
 
-  private String santinizeDatasetStatestoreNameFromDatasetURN(String storeName, String datasetURN)
+  @Override
+  public String sanitizeDatasetStatestoreNameFromDatasetURN(String storeName, String datasetURN)
       throws IOException {
     if (this.stateStoreNameParserLoadingCache == null) {
       return datasetURN;
@@ -176,6 +183,11 @@ public class FsDatasetStateStore extends FsStateStore<JobState.DatasetState> imp
   @Override
   public JobState.DatasetState get(String storeName, String tableName, String stateId)
       throws IOException {
+    return getInternal(storeName, tableName, stateId, false);
+  }
+
+  public JobState.DatasetState getInternal(String storeName, String tableName, String stateId, boolean sanitizeKeyForComparison)
+      throws IOException {
     Path tablePath = new Path(new Path(this.storeRootDir, storeName), tableName);
 
     if (!this.fs.exists(tablePath)) {
@@ -193,8 +205,10 @@ public class FsDatasetStateStore extends FsStateStore<JobState.DatasetState> imp
         Text key = new Text();
 
         while (reader.next(key)) {
+          String stringKey = sanitizeKeyForComparison ?
+              sanitizeDatasetStatestoreNameFromDatasetURN(storeName, key.toString()) : key.toString();
           writable = reader.getCurrentValue(writable);
-          if (key.toString().equals(stateId)) {
+          if (stringKey.equals(stateId)) {
             if (writable instanceof JobState.DatasetState) {
               return (JobState.DatasetState) writable;
             }
@@ -333,7 +347,7 @@ public class FsDatasetStateStore extends FsStateStore<JobState.DatasetState> imp
 
     String alias =
         Strings.isNullOrEmpty(datasetUrn) ? CURRENT_DATASET_STATE_FILE_SUFFIX + DATASET_STATE_STORE_TABLE_SUFFIX
-            : santinizeDatasetStatestoreNameFromDatasetURN(storeName, datasetUrn) + "-"
+            : sanitizeDatasetStatestoreNameFromDatasetURN(storeName, datasetUrn) + "-"
                 + CURRENT_DATASET_STATE_FILE_SUFFIX + DATASET_STATE_STORE_TABLE_SUFFIX;
     return get(storeName, alias, datasetUrn);
   }
@@ -351,9 +365,9 @@ public class FsDatasetStateStore extends FsStateStore<JobState.DatasetState> imp
     String jobId = datasetState.getJobId();
 
     datasetUrn = CharMatcher.is(':').replaceFrom(datasetUrn, '.');
-    String datasetStatestoreName = santinizeDatasetStatestoreNameFromDatasetURN(jobName, datasetUrn);
-    String tableName = Strings.isNullOrEmpty(datasetUrn) ? jobId + DATASET_STATE_STORE_TABLE_SUFFIX
-        : datasetStatestoreName + "-" + jobId + DATASET_STATE_STORE_TABLE_SUFFIX;
+    String datasetStatestoreName = sanitizeDatasetStatestoreNameFromDatasetURN(jobName, datasetUrn);
+    String tableName = Strings.isNullOrEmpty(datasetUrn) ? sanitizeJobId(jobId) + DATASET_STATE_STORE_TABLE_SUFFIX
+        : datasetStatestoreName + "-" + sanitizeJobId(jobId) + DATASET_STATE_STORE_TABLE_SUFFIX;
     LOGGER.info("Persisting " + tableName + " to the job state store");
     put(jobName, tableName, datasetState);
     createAlias(jobName, tableName, getAliasName(datasetStatestoreName));
@@ -367,6 +381,10 @@ public class FsDatasetStateStore extends FsStateStore<JobState.DatasetState> imp
     }
   }
 
+  private String sanitizeJobId(String jobId) {
+    return jobId.replaceAll("[-/]", "_");
+  }
+
   @Override
   public void persistDatasetURNs(String storeName, Collection<String> datasetUrns)
       throws IOException {
@@ -385,4 +403,34 @@ public class FsDatasetStateStore extends FsStateStore<JobState.DatasetState> imp
         + DATASET_STATE_STORE_TABLE_SUFFIX
         : datasetStatestoreName + "-" + CURRENT_DATASET_STATE_FILE_SUFFIX + DATASET_STATE_STORE_TABLE_SUFFIX;
   }
+
+  @Override
+  public List<FsDatasetStateStoreEntryManager> getMetadataForTables(StateStorePredicate predicate)
+      throws IOException {
+
+    Stream<Path> stores = predicate instanceof StoreNamePredicate ?
+        Stream.of(new Path(this.storeRootDir, ((StoreNamePredicate) predicate).getStoreName())) :
+        lsStream(new Path(this.storeRootDir)).map(FileStatus::getPath);
+
+    if (stores == null) {
+      return Lists.newArrayList();
+    }
+
+    Stream<FileStatus> tables = stores.flatMap(this::lsStream);
+
+    return tables.map(this::parseMetadataFromPath).filter(predicate::apply).collect(Collectors.toList());
+  }
+
+  private Stream<FileStatus> lsStream(Path path) {
+    try {
+      FileStatus[] ls = this.fs.listStatus(path, new HiddenFilter());
+      return ls == null ? Stream.empty() : Arrays.stream(ls);
+    } catch (IOException ioe) {
+      return Stream.empty();
+    }
+  }
+
+  private FsDatasetStateStoreEntryManager parseMetadataFromPath(FileStatus status) {
+    return new FsDatasetStateStoreEntryManager(status, this);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/998fe200/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metastore/filesystem/FsDatasetStateStoreEntryManager.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metastore/filesystem/FsDatasetStateStoreEntryManager.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metastore/filesystem/FsDatasetStateStoreEntryManager.java
new file mode 100644
index 0000000..a06bd65
--- /dev/null
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metastore/filesystem/FsDatasetStateStoreEntryManager.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.metastore.filesystem;
+
+import java.io.IOException;
+
+import org.apache.gobblin.metastore.DatasetStateStore;
+import org.apache.gobblin.metastore.metadata.DatasetStateStoreEntryManager;
+import org.apache.gobblin.runtime.FsDatasetStateStore;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.hadoop.fs.FileStatus;
+
+
+/**
+ * A {@link DatasetStateStoreEntryManager} generated by {@link FsDatasetStateStore}.
+ */
+public class FsDatasetStateStoreEntryManager extends DatasetStateStoreEntryManager<JobState.DatasetState> {
+
+  private final FsDatasetStateStore stateStore;
+
+  public FsDatasetStateStoreEntryManager(FileStatus fileStatus, FsDatasetStateStore stateStore) {
+    super(fileStatus.getPath().getParent().getName(), fileStatus.getPath().getName(), fileStatus.getModificationTime(),
+        new DatasetStateStore.TableNameParser(fileStatus.getPath().getName()), stateStore);
+    this.stateStore = stateStore;
+  }
+
+  @Override
+  public JobState.DatasetState readState() throws IOException {
+    return this.stateStore.getInternal(getStoreName(), getTableName(), getSanitizedDatasetUrn(), true);
+  }
+
+  @Override
+  public void delete() throws IOException {
+    this.stateStore.delete(getStoreName(), getTableName());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/998fe200/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/FsDatasetStateStoreTest.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/FsDatasetStateStoreTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/FsDatasetStateStoreTest.java
index 8aa9ca8..1ff6a07 100644
--- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/FsDatasetStateStoreTest.java
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/FsDatasetStateStoreTest.java
@@ -17,9 +17,17 @@
 
 package org.apache.gobblin.runtime;
 
+import java.io.File;
 import java.io.IOException;
+import java.util.List;
 import java.util.Map;
 
+import org.apache.gobblin.metastore.DatasetStateStore;
+import org.apache.gobblin.metastore.metadata.DatasetStateStoreEntryManager;
+import org.apache.gobblin.metastore.predicates.DatasetPredicate;
+import org.apache.gobblin.metastore.predicates.StateStorePredicate;
+import org.apache.gobblin.metastore.predicates.StoreNamePredicate;
+import org.apache.gobblin.runtime.metastore.filesystem.FsDatasetStateStoreEntryManager;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -33,6 +41,8 @@ import org.apache.gobblin.configuration.WorkUnitState;
 import org.apache.gobblin.metastore.FsStateStore;
 import org.apache.gobblin.metastore.StateStore;
 
+import com.google.common.io.Files;
+
 
 /**
  * Unit tests for {@link FsDatasetStateStore}.
@@ -171,6 +181,62 @@ public class FsDatasetStateStoreTest {
     Assert.assertEquals(datasetState.getDuration(), 1000);
   }
 
+  @Test
+  public void testGetMetadataForTables() throws Exception {
+
+    File tmpDir = Files.createTempDir();
+    tmpDir.deleteOnExit();
+
+    FsDatasetStateStore store = new FsDatasetStateStore(FileSystem.getLocal(new Configuration()), tmpDir.getAbsolutePath());
+
+    JobState.DatasetState dataset2State = new JobState.DatasetState("job1", "job1_id2");
+    dataset2State.setDatasetUrn("dataset2");
+    dataset2State.setId("dataset2");
+    TaskState taskState = new TaskState();
+    taskState.setJobId("job1_id2");
+    taskState.setTaskId("task123");
+    taskState.setProp("key", "value");
+    dataset2State.addTaskState(taskState);
+
+    store.persistDatasetState("dataset1", new JobState.DatasetState("job1", "job1_id1"));
+    store.persistDatasetState("dataset1", new JobState.DatasetState("job1", "job1_id2"));
+    store.persistDatasetState("dataset2", dataset2State);
+    store.persistDatasetState("dataset1", new JobState.DatasetState("job2", "job2_id1"));
+    store.persistDatasetState("", new JobState.DatasetState("job3", "job3_id1"));
+
+    List<FsDatasetStateStoreEntryManager> metadataList = store.getMetadataForTables(new StateStorePredicate(x -> true));
+
+    // 5 explicitly stored states, plus 4 current links, one per job-dataset
+    Assert.assertEquals(metadataList.size(), 9);
+
+    metadataList = store.getMetadataForTables(new StoreNamePredicate("job1", x-> true));
+    // 3 explicitly stored states, plus 2 current links, one per dataset
+    Assert.assertEquals(metadataList.size(), 5);
+
+    metadataList = store.getMetadataForTables(new DatasetPredicate("job1", "dataset1", x -> true));
+    Assert.assertEquals(metadataList.size(), 3);
+
+    metadataList = store.getMetadataForTables(new DatasetPredicate("job1", "dataset2", meta ->
+      ((DatasetStateStoreEntryManager) meta).getStateId().equals(DatasetStateStore.CURRENT_DATASET_STATE_FILE_SUFFIX)
+    ));
+    Assert.assertEquals(metadataList.size(), 1);
+    DatasetStateStoreEntryManager metadata = metadataList.get(0);
+    Assert.assertEquals(metadata.getStoreName(), "job1");
+    Assert.assertEquals(metadata.getSanitizedDatasetUrn(), "dataset2");
+    Assert.assertEquals(metadata.getStateId(), DatasetStateStore.CURRENT_DATASET_STATE_FILE_SUFFIX);
+    Assert.assertEquals(metadata.getDatasetStateStore(), store);
+
+    JobState.DatasetState readState = (JobState.DatasetState) metadata.readState();
+    TaskState readTaskState = readState.getTaskStates().get(0);
+    Assert.assertEquals(readTaskState.getProp("key"), "value");
+    metadata.delete();
+    // verify it got deleted
+    metadataList = store.getMetadataForTables(new DatasetPredicate("job1", "dataset2", meta ->
+        ((DatasetStateStoreEntryManager) meta).getStateId().equals(DatasetStateStore.CURRENT_DATASET_STATE_FILE_SUFFIX)
+    ));
+    Assert.assertTrue(metadataList.isEmpty());
+  }
+
   @AfterClass
   public void tearDown() throws IOException {
     FileSystem fs = FileSystem.getLocal(new Configuration(false));

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/998fe200/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/JobLauncherTestHelper.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/JobLauncherTestHelper.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/JobLauncherTestHelper.java
index 068d357..5a3c631 100644
--- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/JobLauncherTestHelper.java
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/JobLauncherTestHelper.java
@@ -79,7 +79,7 @@ public class JobLauncherTestHelper {
     Assert.assertTrue(jobMetricContextTags.contains(ClusterNameTags.CLUSTER_IDENTIFIER_TAG_NAME),
         ClusterNameTags.CLUSTER_IDENTIFIER_TAG_NAME + " tag missing in job metric context tags.");
 
-    List<JobState.DatasetState> datasetStateList = this.datasetStateStore.getAll(jobName, jobId + ".jst");
+    List<JobState.DatasetState> datasetStateList = this.datasetStateStore.getAll(jobName, sanitizeJobNameForDatasetStore(jobId) + ".jst");
     DatasetState datasetState = datasetStateList.get(0);
 
     Assert.assertEquals(datasetState.getState(), JobState.RunningState.COMMITTED);
@@ -106,7 +106,7 @@ public class JobLauncherTestHelper {
       closer.close();
     }
 
-    List<JobState.DatasetState> datasetStateList = this.datasetStateStore.getAll(jobName, jobId + ".jst");
+    List<JobState.DatasetState> datasetStateList = this.datasetStateStore.getAll(jobName, sanitizeJobNameForDatasetStore(jobId) + ".jst");
     DatasetState datasetState = datasetStateList.get(0);
 
     Assert.assertEquals(datasetState.getState(), JobState.RunningState.COMMITTED);
@@ -151,7 +151,7 @@ public class JobLauncherTestHelper {
       closer.close();
     }
 
-    List<JobState.DatasetState> datasetStateList = this.datasetStateStore.getAll(jobName, jobId + ".jst");
+    List<JobState.DatasetState> datasetStateList = this.datasetStateStore.getAll(jobName, sanitizeJobNameForDatasetStore(jobId) + ".jst");
     Assert.assertTrue(datasetStateList.isEmpty());
   }
 
@@ -164,7 +164,7 @@ public class JobLauncherTestHelper {
       jobLauncher.launchJob(null);
     }
 
-    List<JobState.DatasetState> datasetStateList = this.datasetStateStore.getAll(jobName, jobId + ".jst");
+    List<JobState.DatasetState> datasetStateList = this.datasetStateStore.getAll(jobName, sanitizeJobNameForDatasetStore(jobId) + ".jst");
     DatasetState datasetState = datasetStateList.get(0);
 
     Assert.assertEquals(datasetState.getState(), JobState.RunningState.COMMITTED);
@@ -239,7 +239,7 @@ public class JobLauncherTestHelper {
       closer.close();
     }
 
-    List<JobState.DatasetState> datasetStateList = this.datasetStateStore.getAll(jobName, jobId + ".jst");
+    List<JobState.DatasetState> datasetStateList = this.datasetStateStore.getAll(jobName, sanitizeJobNameForDatasetStore(jobId) + ".jst");
     JobState jobState = datasetStateList.get(0);
 
     Assert.assertEquals(jobState.getState(), JobState.RunningState.COMMITTED);
@@ -355,4 +355,8 @@ public class JobLauncherTestHelper {
       return extractor;
     }
   }
+
+  private String sanitizeJobNameForDatasetStore(String jobId) {
+    return jobId.replaceAll("[-/]", "_");
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/998fe200/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/commit/CommitSequenceTest.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/commit/CommitSequenceTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/commit/CommitSequenceTest.java
index 213f1e4..141b48c 100644
--- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/commit/CommitSequenceTest.java
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/commit/CommitSequenceTest.java
@@ -93,7 +93,7 @@ public class CommitSequenceTest {
 
     Assert.assertTrue(this.fs.exists(new Path(ROOT_DIR, "dir1/file2")));
     Assert.assertTrue(this.fs.exists(new Path(ROOT_DIR, "dir2/file1")));
-    Assert.assertTrue(this.fs.exists(new Path(ROOT_DIR, "store/job-name/urn-job-id.jst")));
+    Assert.assertTrue(this.fs.exists(new Path(ROOT_DIR, "store/job-name/urn-job_id.jst")));
     Assert.assertTrue(this.fs.exists(new Path(ROOT_DIR, "store/job-name/urn-current.jst")));
   }