You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by ib...@apache.org on 2017/08/14 17:35:56 UTC
incubator-gobblin git commit: [GOBBLIN-199] API for listing the
contents of a state store
Repository: incubator-gobblin
Updated Branches:
refs/heads/master ee91502e6 -> 998fe200d
[GOBBLIN-199] API for listing the contents of a state store
Closes #2051 from ibuenros/state-store-listapi
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/998fe200
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/998fe200
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/998fe200
Branch: refs/heads/master
Commit: 998fe200de1cb74e6797f119752206687a74dcbb
Parents: ee91502
Author: ibuenros <is...@gmail.com>
Authored: Mon Aug 14 10:35:41 2017 -0700
Committer: Issac Buenrostro <ib...@apache.org>
Committed: Mon Aug 14 10:35:41 2017 -0700
----------------------------------------------------------------------
.../apache/gobblin/dataset/DatasetsFinder.java | 1 +
.../gobblin/metastore/DatasetStateStore.java | 64 ++++++++++++++
.../gobblin/metastore/DatasetStoreDataset.java | 56 ++++++++++++
.../metastore/DatasetStoreDatasetFinder.java | 93 ++++++++++++++++++++
.../apache/gobblin/metastore/StateStore.java | 19 ++++
.../metadata/DatasetStateStoreEntryManager.java | 60 +++++++++++++
.../metadata/StateStoreEntryManager.java | 63 +++++++++++++
.../metastore/predicates/DatasetPredicate.java | 55 ++++++++++++
.../predicates/StateStorePredicate.java | 43 +++++++++
.../predicates/StoreNamePredicate.java | 44 +++++++++
.../gobblin/runtime/FsDatasetStateStore.java | 60 +++++++++++--
.../FsDatasetStateStoreEntryManager.java | 51 +++++++++++
.../runtime/FsDatasetStateStoreTest.java | 66 ++++++++++++++
.../gobblin/runtime/JobLauncherTestHelper.java | 14 +--
.../runtime/commit/CommitSequenceTest.java | 2 +-
15 files changed, 679 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/998fe200/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetsFinder.java
----------------------------------------------------------------------
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetsFinder.java b/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetsFinder.java
index 27a4dae..44fea79 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetsFinder.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetsFinder.java
@@ -43,5 +43,6 @@ public interface DatasetsFinder<T extends Dataset> {
/**
* @return The deepest common root shared by all {@link Dataset}s root paths returned by this finder.
*/
+ @Deprecated
public Path commonDatasetRoot();
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/998fe200/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/DatasetStateStore.java
----------------------------------------------------------------------
diff --git a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/DatasetStateStore.java b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/DatasetStateStore.java
index 16b3442..91d8db7 100644
--- a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/DatasetStateStore.java
+++ b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/DatasetStateStore.java
@@ -19,16 +19,30 @@ package org.apache.gobblin.metastore;
import java.io.IOException;
import java.util.Collection;
+import java.util.List;
import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import com.google.common.base.Strings;
import com.typesafe.config.Config;
+import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.metastore.metadata.DatasetStateStoreEntryManager;
+import org.apache.gobblin.metastore.predicates.StateStorePredicate;
+import org.apache.gobblin.util.ClassAliasResolver;
+import org.apache.gobblin.util.ConfigUtils;
+
+import lombok.Getter;
+
public interface DatasetStateStore<T extends State> extends StateStore<T> {
String DATASET_STATE_STORE_TABLE_SUFFIX = ".jst";
String CURRENT_DATASET_STATE_FILE_SUFFIX = "current";
+ Pattern TABLE_NAME_PARSER_PATTERN = Pattern.compile("^(?:(.+)-)?([^-]+)\\.jst$");
+
interface Factory {
<T extends State> DatasetStateStore<T> createStateStore(Config config);
}
@@ -40,4 +54,54 @@ public interface DatasetStateStore<T extends State> extends StateStore<T> {
public void persistDatasetState(String datasetUrn, T datasetState) throws IOException;
public void persistDatasetURNs(String storeName, Collection<String> datasetUrns) throws IOException;
+
+ @Override
+ default List<? extends DatasetStateStoreEntryManager> getMetadataForTables(StateStorePredicate predicate)
+ throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ default String sanitizeDatasetStatestoreNameFromDatasetURN(String storeName, String datasetURN) throws IOException {
+ return datasetURN;
+ }
+
+ static String buildTableName(DatasetStateStore store, String storeName, String stateId, String datasetUrn) throws IOException {
+ return Strings.isNullOrEmpty(datasetUrn) ? stateId + DATASET_STATE_STORE_TABLE_SUFFIX
+ : store.sanitizeDatasetStatestoreNameFromDatasetURN(storeName,datasetUrn) + "-" + stateId + DATASET_STATE_STORE_TABLE_SUFFIX;
+ }
+
+ @Getter
+ class TableNameParser {
+ private final String sanitizedDatasetUrn;
+ private final String stateId;
+
+ public TableNameParser(String tableName) {
+ Matcher matcher = TABLE_NAME_PARSER_PATTERN.matcher(tableName);
+ if (matcher.matches()) {
+ this.sanitizedDatasetUrn = matcher.group(1);
+ this.stateId = matcher.group(2);
+ } else {
+ throw new IllegalArgumentException("Cannot parse table name " + tableName);
+ }
+ }
+ }
+
+ static DatasetStateStore buildDatasetStateStore(Config config) throws IOException {
+ ClassAliasResolver<Factory> resolver =
+ new ClassAliasResolver<>(DatasetStateStore.Factory.class);
+
+ String stateStoreType = ConfigUtils.getString(config, ConfigurationKeys.STATE_STORE_TYPE_KEY,
+ ConfigurationKeys.DEFAULT_STATE_STORE_TYPE);
+
+ try {
+ DatasetStateStore.Factory stateStoreFactory =
+ resolver.resolveClass(stateStoreType).newInstance();
+
+ return stateStoreFactory.createStateStore(config);
+ } catch (RuntimeException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/998fe200/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/DatasetStoreDataset.java
----------------------------------------------------------------------
diff --git a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/DatasetStoreDataset.java b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/DatasetStoreDataset.java
new file mode 100644
index 0000000..666cb56
--- /dev/null
+++ b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/DatasetStoreDataset.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metastore;
+
+import java.util.List;
+
+import org.apache.gobblin.dataset.Dataset;
+import org.apache.gobblin.metastore.metadata.DatasetStateStoreEntryManager;
+
+import lombok.Data;
+
+
+/**
+ * A {@link Dataset} representing a group of entries in a {@link DatasetStateStore} with the same dataset urn.
+ */
+@Data
+public class DatasetStoreDataset implements Dataset {
+
+ private final Key key;
+ private final List<DatasetStateStoreEntryManager> datasetStateStoreMetadataEntries;
+
+ @Override
+ public String datasetURN() {
+ return this.key.getStoreName() + ":::" + this.key.getSanitizedDatasetUrn();
+ }
+
+ /**
+ * The key for a {@link DatasetStoreDataset}.
+ */
+ @Data
+ public static class Key {
+ private final String storeName;
+ private final String sanitizedDatasetUrn;
+
+ public Key(DatasetStateStoreEntryManager metadata) {
+ this.storeName = metadata.getStoreName();
+ this.sanitizedDatasetUrn = metadata.getSanitizedDatasetUrn();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/998fe200/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/DatasetStoreDatasetFinder.java
----------------------------------------------------------------------
diff --git a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/DatasetStoreDatasetFinder.java b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/DatasetStoreDatasetFinder.java
new file mode 100644
index 0000000..75d083d
--- /dev/null
+++ b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/DatasetStoreDatasetFinder.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metastore;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.stream.Collectors;
+
+import org.apache.gobblin.dataset.DatasetsFinder;
+import org.apache.gobblin.metastore.metadata.DatasetStateStoreEntryManager;
+import org.apache.gobblin.metastore.predicates.DatasetPredicate;
+import org.apache.gobblin.metastore.predicates.StateStorePredicate;
+import org.apache.gobblin.metastore.predicates.StoreNamePredicate;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+
+/**
+ * A {@link DatasetsFinder} to find {@link DatasetStoreDataset}s.
+ */
+public class DatasetStoreDatasetFinder implements DatasetsFinder<DatasetStoreDataset> {
+
+ public static final String STORE_NAME_FILTER = "datasetStoreDatasetFinder.filter.storeName";
+ public static final String DATASET_URN_FILTER = "datasetStoreDatasetFinder.filter.datasetUrn";
+
+ private final Config config;
+ private final DatasetStateStore store;
+ private final StateStorePredicate predicate;
+
+ public DatasetStoreDatasetFinder(FileSystem fs, Properties props) throws IOException {
+ this.config = ConfigFactory.parseProperties(props);
+ this.store = DatasetStateStore.buildDatasetStateStore(this.config);
+ this.predicate = buildPredicate();
+ }
+
+ private StateStorePredicate buildPredicate() {
+ StateStorePredicate predicate= null;
+ String storeName = null;
+ String datasetUrn;
+
+ if (ConfigUtils.hasNonEmptyPath(this.config, STORE_NAME_FILTER)) {
+ storeName = this.config.getString(STORE_NAME_FILTER);
+ predicate = new StoreNamePredicate(storeName, x -> true);
+ }
+
+ if (ConfigUtils.hasNonEmptyPath(this.config, DATASET_URN_FILTER)) {
+ if (storeName == null) {
+ throw new IllegalArgumentException(DATASET_URN_FILTER + " requires " + STORE_NAME_FILTER + " to also be defined.");
+ }
+ datasetUrn = this.config.getString(DATASET_URN_FILTER);
+ predicate = new DatasetPredicate(storeName, datasetUrn, x -> true);
+ }
+
+ return predicate == null ? new StateStorePredicate(x -> true) : predicate;
+ }
+
+ @Override
+ public List<DatasetStoreDataset> findDatasets() throws IOException {
+ List<DatasetStateStoreEntryManager> entries = this.store.getMetadataForTables(this.predicate);
+
+ Map<DatasetStoreDataset.Key, List<DatasetStateStoreEntryManager>> entriesGroupedByDataset =
+ entries.stream().collect(Collectors.groupingBy(DatasetStoreDataset.Key::new));
+
+ return entriesGroupedByDataset.entrySet().stream().
+ map(entry -> new DatasetStoreDataset(entry.getKey(), entry.getValue())).collect(Collectors.toList());
+ }
+
+ @Override
+ public Path commonDatasetRoot() {
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/998fe200/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/StateStore.java
----------------------------------------------------------------------
diff --git a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/StateStore.java b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/StateStore.java
index 578c94a..46c2aa8 100644
--- a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/StateStore.java
+++ b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/StateStore.java
@@ -24,6 +24,8 @@ import java.util.Collection;
import java.util.List;
import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.metastore.metadata.StateStoreEntryManager;
+import org.apache.gobblin.metastore.predicates.StateStorePredicate;
/**
@@ -35,6 +37,11 @@ import org.apache.gobblin.configuration.State;
* {@link State#getId()}).
* </p>
*
+ * <p>
+ * Note: Implementations of dataset store should maintain a timestamp for every state they persist. Certain utilities
+ * will not work if this is not the case.
+ * </p>
+ *
* @param <T> state object type
*
* @author Yinan Li
@@ -193,4 +200,16 @@ public interface StateStore<T extends State> {
*/
public void delete(String storeName)
throws IOException;
+
+ /**
+ * Gets metadata for all tables matching the input
+ * @param predicate Predicate used to filter tables. To allow state stores to push down predicates, use native extensions
+ * of {@link StateStorePredicate}.
+ * @return A list of all {@link StateStoreEntryManager}s matching the predicate.
+ * @throws IOException
+ */
+ default List<? extends StateStoreEntryManager> getMetadataForTables(StateStorePredicate predicate)
+ throws IOException {
+ throw new UnsupportedOperationException("Operation unsupported for predicate with class " + predicate.getClass());
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/998fe200/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/metadata/DatasetStateStoreEntryManager.java
----------------------------------------------------------------------
diff --git a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/metadata/DatasetStateStoreEntryManager.java b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/metadata/DatasetStateStoreEntryManager.java
new file mode 100644
index 0000000..32bc851
--- /dev/null
+++ b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/metadata/DatasetStateStoreEntryManager.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metastore.metadata;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.metastore.DatasetStateStore;
+
+import lombok.Getter;
+
+
+/**
+ * A {@link StateStoreEntryManager} in a {@link DatasetStateStore}.
+ */
+@Getter
+public abstract class DatasetStateStoreEntryManager<T extends State> extends StateStoreEntryManager<T> {
+
+ /**
+ * The sanitized dataset urn. Sanitization usually involves a one-way function on the dataset urn, so the actual
+ * urn cannot be determined except by {@link #readState()}.
+ */
+ private final String sanitizedDatasetUrn;
+ /**
+ * An identifier for the state. Usually a job id or "current" for the latest state for that dataset.
+ */
+ private final String stateId;
+ private final DatasetStateStore datasetStateStore;
+
+ public DatasetStateStoreEntryManager(String storeName, String tableName, long timestamp,
+ DatasetStateStore.TableNameParser tableNameParser, DatasetStateStore datasetStateStore) {
+ this(storeName, tableName, timestamp, tableNameParser.getSanitizedDatasetUrn(), tableNameParser.getStateId(), datasetStateStore);
+ }
+
+ public DatasetStateStoreEntryManager(String storeName, String tableName, long timestamp, String sanitizedDatasetUrn,
+ String stateId, DatasetStateStore datasetStateStore) {
+ super(storeName, tableName, timestamp, datasetStateStore);
+ this.sanitizedDatasetUrn = sanitizedDatasetUrn;
+ this.stateId = stateId;
+ this.datasetStateStore = datasetStateStore;
+ }
+
+ @Override
+ public DatasetStateStore getStateStore() {
+ return this.datasetStateStore;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/998fe200/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/metadata/StateStoreEntryManager.java
----------------------------------------------------------------------
diff --git a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/metadata/StateStoreEntryManager.java b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/metadata/StateStoreEntryManager.java
new file mode 100644
index 0000000..b2fb04c
--- /dev/null
+++ b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/metadata/StateStoreEntryManager.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metastore.metadata;
+
+import java.io.IOException;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.metastore.StateStore;
+
+
+import lombok.Data;
+
+
+/**
+ * Contains metadata about an entry in a {@link StateStore}.
+ * @param <T> type of {@link State} that can be read from this entry.
+ */
+@Data
+public abstract class StateStoreEntryManager<T extends State> {
+
+ private final String storeName;
+ private final String tableName;
+ /** Timestamp at which the state was written. */
+ private final long timestamp;
+
+ /** {@link StateStore} where this entry exists. */
+ private final StateStore stateStore;
+
+ private final long getTimestamp() {
+ if (this.timestamp <= 0) {
+ throw new RuntimeException("Timestamp is not reliable.");
+ }
+ return this.timestamp;
+ }
+
+ /**
+ * @return The {@link State} contained in this entry. This operation should be lazy.
+ * @throws IOException
+ */
+ public abstract T readState() throws IOException;
+
+ /**
+ * Delete this entry in the {@link StateStore}.
+ * @throws IOException
+ */
+ public abstract void delete() throws IOException;
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/998fe200/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/predicates/DatasetPredicate.java
----------------------------------------------------------------------
diff --git a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/predicates/DatasetPredicate.java b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/predicates/DatasetPredicate.java
new file mode 100644
index 0000000..3e8cb62
--- /dev/null
+++ b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/predicates/DatasetPredicate.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metastore.predicates;
+
+import java.io.IOException;
+
+import org.apache.gobblin.metastore.metadata.DatasetStateStoreEntryManager;
+import org.apache.gobblin.metastore.metadata.StateStoreEntryManager;
+
+import com.google.common.base.Predicate;
+
+
+/**
+ * A {@link StateStorePredicate} used to select only entries from a {@link org.apache.gobblin.metastore.DatasetStateStore}
+ * with the provided dataset urn.
+ */
+public class DatasetPredicate extends StoreNamePredicate {
+
+ private final String datasetUrn;
+
+ public DatasetPredicate(String storeName, String datasetUrn, Predicate<StateStoreEntryManager> customPredicate) {
+ super(storeName, customPredicate);
+ this.datasetUrn = datasetUrn;
+ }
+
+ @Override
+ public boolean apply(StateStoreEntryManager input) {
+ if (!(input instanceof DatasetStateStoreEntryManager)) {
+ return false;
+ }
+ DatasetStateStoreEntryManager datasetStateStoreEntryMetadata = (DatasetStateStoreEntryManager) input;
+ try {
+ return super.apply(input) && datasetStateStoreEntryMetadata.getStateStore().
+ sanitizeDatasetStatestoreNameFromDatasetURN(getStoreName(), this.datasetUrn).
+ equals(((DatasetStateStoreEntryManager) input).getSanitizedDatasetUrn());
+ } catch (IOException ioe) {
+ throw new RuntimeException(ioe);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/998fe200/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/predicates/StateStorePredicate.java
----------------------------------------------------------------------
diff --git a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/predicates/StateStorePredicate.java b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/predicates/StateStorePredicate.java
new file mode 100644
index 0000000..789414c
--- /dev/null
+++ b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/predicates/StateStorePredicate.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metastore.predicates;
+
+import org.apache.gobblin.metastore.metadata.StateStoreEntryManager;
+
+import com.google.common.base.Predicate;
+
+import lombok.Data;
+import lombok.RequiredArgsConstructor;
+import lombok.experimental.Delegate;
+
+
+/**
+ * A {@link Predicate} used to filter entries in a {@link org.apache.gobblin.metastore.StateStore}.
+ *
+ * {@link org.apache.gobblin.metastore.StateStore}s can usually partially push down extensions of this class, so it
+ * is recommended to use bundled {@link StateStorePredicate} extensions as much as possible.
+ */
+@RequiredArgsConstructor
+public class StateStorePredicate implements Predicate<StateStoreEntryManager> {
+
+ /**
+ * An additional {@link Predicate} for filtering. This predicate is never pushed down.
+ */
+ @Delegate
+ private final Predicate<StateStoreEntryManager> customPredicate;
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/998fe200/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/predicates/StoreNamePredicate.java
----------------------------------------------------------------------
diff --git a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/predicates/StoreNamePredicate.java b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/predicates/StoreNamePredicate.java
new file mode 100644
index 0000000..2b1043d
--- /dev/null
+++ b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/predicates/StoreNamePredicate.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metastore.predicates;
+
+import org.apache.gobblin.metastore.metadata.StateStoreEntryManager;
+
+import com.google.common.base.Predicate;
+
+import lombok.Getter;
+
+
+/**
+ * A {@link StateStorePredicate} to select only entries with a specific {@link #storeName}.
+ */
+public class StoreNamePredicate extends StateStorePredicate {
+
+ @Getter
+ private final String storeName;
+
+ public StoreNamePredicate(String storeName, Predicate<StateStoreEntryManager> customPredicate) {
+ super(customPredicate);
+ this.storeName = storeName;
+ }
+
+ @Override
+ public boolean apply(StateStoreEntryManager input) {
+ return input.getStoreName().equals(this.storeName) && super.apply(input);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/998fe200/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/FsDatasetStateStore.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/FsDatasetStateStore.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/FsDatasetStateStore.java
index 0e99dae..fa35921 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/FsDatasetStateStore.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/FsDatasetStateStore.java
@@ -27,7 +27,13 @@ import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.gobblin.metastore.predicates.StateStorePredicate;
+import org.apache.gobblin.metastore.predicates.StoreNamePredicate;
+import org.apache.gobblin.runtime.metastore.filesystem.FsDatasetStateStoreEntryManager;
+import org.apache.gobblin.util.filters.HiddenFilter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -158,7 +164,8 @@ public class FsDatasetStateStore extends FsStateStore<JobState.DatasetState> imp
this.useTmpFileForPut = false;
}
- private String santinizeDatasetStatestoreNameFromDatasetURN(String storeName, String datasetURN)
+ @Override
+ public String sanitizeDatasetStatestoreNameFromDatasetURN(String storeName, String datasetURN)
throws IOException {
if (this.stateStoreNameParserLoadingCache == null) {
return datasetURN;
@@ -176,6 +183,11 @@ public class FsDatasetStateStore extends FsStateStore<JobState.DatasetState> imp
@Override
public JobState.DatasetState get(String storeName, String tableName, String stateId)
throws IOException {
+ return getInternal(storeName, tableName, stateId, false);
+ }
+
+ public JobState.DatasetState getInternal(String storeName, String tableName, String stateId, boolean sanitizeKeyForComparison)
+ throws IOException {
Path tablePath = new Path(new Path(this.storeRootDir, storeName), tableName);
if (!this.fs.exists(tablePath)) {
@@ -193,8 +205,10 @@ public class FsDatasetStateStore extends FsStateStore<JobState.DatasetState> imp
Text key = new Text();
while (reader.next(key)) {
+ String stringKey = sanitizeKeyForComparison ?
+ sanitizeDatasetStatestoreNameFromDatasetURN(storeName, key.toString()) : key.toString();
writable = reader.getCurrentValue(writable);
- if (key.toString().equals(stateId)) {
+ if (stringKey.equals(stateId)) {
if (writable instanceof JobState.DatasetState) {
return (JobState.DatasetState) writable;
}
@@ -333,7 +347,7 @@ public class FsDatasetStateStore extends FsStateStore<JobState.DatasetState> imp
String alias =
Strings.isNullOrEmpty(datasetUrn) ? CURRENT_DATASET_STATE_FILE_SUFFIX + DATASET_STATE_STORE_TABLE_SUFFIX
- : santinizeDatasetStatestoreNameFromDatasetURN(storeName, datasetUrn) + "-"
+ : sanitizeDatasetStatestoreNameFromDatasetURN(storeName, datasetUrn) + "-"
+ CURRENT_DATASET_STATE_FILE_SUFFIX + DATASET_STATE_STORE_TABLE_SUFFIX;
return get(storeName, alias, datasetUrn);
}
@@ -351,9 +365,9 @@ public class FsDatasetStateStore extends FsStateStore<JobState.DatasetState> imp
String jobId = datasetState.getJobId();
datasetUrn = CharMatcher.is(':').replaceFrom(datasetUrn, '.');
- String datasetStatestoreName = santinizeDatasetStatestoreNameFromDatasetURN(jobName, datasetUrn);
- String tableName = Strings.isNullOrEmpty(datasetUrn) ? jobId + DATASET_STATE_STORE_TABLE_SUFFIX
- : datasetStatestoreName + "-" + jobId + DATASET_STATE_STORE_TABLE_SUFFIX;
+ String datasetStatestoreName = sanitizeDatasetStatestoreNameFromDatasetURN(jobName, datasetUrn);
+ String tableName = Strings.isNullOrEmpty(datasetUrn) ? sanitizeJobId(jobId) + DATASET_STATE_STORE_TABLE_SUFFIX
+ : datasetStatestoreName + "-" + sanitizeJobId(jobId) + DATASET_STATE_STORE_TABLE_SUFFIX;
LOGGER.info("Persisting " + tableName + " to the job state store");
put(jobName, tableName, datasetState);
createAlias(jobName, tableName, getAliasName(datasetStatestoreName));
@@ -367,6 +381,10 @@ public class FsDatasetStateStore extends FsStateStore<JobState.DatasetState> imp
}
}
+ private String sanitizeJobId(String jobId) {
+ return jobId.replaceAll("[-/]", "_");
+ }
+
@Override
public void persistDatasetURNs(String storeName, Collection<String> datasetUrns)
throws IOException {
@@ -385,4 +403,34 @@ public class FsDatasetStateStore extends FsStateStore<JobState.DatasetState> imp
+ DATASET_STATE_STORE_TABLE_SUFFIX
: datasetStatestoreName + "-" + CURRENT_DATASET_STATE_FILE_SUFFIX + DATASET_STATE_STORE_TABLE_SUFFIX;
}
+
+ @Override
+ public List<FsDatasetStateStoreEntryManager> getMetadataForTables(StateStorePredicate predicate)
+ throws IOException {
+
+ Stream<Path> stores = predicate instanceof StoreNamePredicate ?
+ Stream.of(new Path(this.storeRootDir, ((StoreNamePredicate) predicate).getStoreName())) :
+ lsStream(new Path(this.storeRootDir)).map(FileStatus::getPath);
+
+ if (stores == null) {
+ return Lists.newArrayList();
+ }
+
+ Stream<FileStatus> tables = stores.flatMap(this::lsStream);
+
+ return tables.map(this::parseMetadataFromPath).filter(predicate::apply).collect(Collectors.toList());
+ }
+
+ private Stream<FileStatus> lsStream(Path path) {
+ try {
+ FileStatus[] ls = this.fs.listStatus(path, new HiddenFilter());
+ return ls == null ? Stream.empty() : Arrays.stream(ls);
+ } catch (IOException ioe) {
+ return Stream.empty();
+ }
+ }
+
+ private FsDatasetStateStoreEntryManager parseMetadataFromPath(FileStatus status) {
+ return new FsDatasetStateStoreEntryManager(status, this);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/998fe200/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metastore/filesystem/FsDatasetStateStoreEntryManager.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metastore/filesystem/FsDatasetStateStoreEntryManager.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metastore/filesystem/FsDatasetStateStoreEntryManager.java
new file mode 100644
index 0000000..a06bd65
--- /dev/null
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metastore/filesystem/FsDatasetStateStoreEntryManager.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.metastore.filesystem;
+
+import java.io.IOException;
+
+import org.apache.gobblin.metastore.DatasetStateStore;
+import org.apache.gobblin.metastore.metadata.DatasetStateStoreEntryManager;
+import org.apache.gobblin.runtime.FsDatasetStateStore;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.hadoop.fs.FileStatus;
+
+
+/**
+ * A {@link DatasetStateStoreEntryManager} generated by {@link FsDatasetStateStore}.
+ */
+public class FsDatasetStateStoreEntryManager extends DatasetStateStoreEntryManager<JobState.DatasetState> {
+
+ private final FsDatasetStateStore stateStore;
+
+ public FsDatasetStateStoreEntryManager(FileStatus fileStatus, FsDatasetStateStore stateStore) {
+ super(fileStatus.getPath().getParent().getName(), fileStatus.getPath().getName(), fileStatus.getModificationTime(),
+ new DatasetStateStore.TableNameParser(fileStatus.getPath().getName()), stateStore);
+ this.stateStore = stateStore;
+ }
+
+ @Override
+ public JobState.DatasetState readState() throws IOException {
+ return this.stateStore.getInternal(getStoreName(), getTableName(), getSanitizedDatasetUrn(), true);
+ }
+
+ @Override
+ public void delete() throws IOException {
+ this.stateStore.delete(getStoreName(), getTableName());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/998fe200/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/FsDatasetStateStoreTest.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/FsDatasetStateStoreTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/FsDatasetStateStoreTest.java
index 8aa9ca8..1ff6a07 100644
--- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/FsDatasetStateStoreTest.java
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/FsDatasetStateStoreTest.java
@@ -17,9 +17,17 @@
package org.apache.gobblin.runtime;
+import java.io.File;
import java.io.IOException;
+import java.util.List;
import java.util.Map;
+import org.apache.gobblin.metastore.DatasetStateStore;
+import org.apache.gobblin.metastore.metadata.DatasetStateStoreEntryManager;
+import org.apache.gobblin.metastore.predicates.DatasetPredicate;
+import org.apache.gobblin.metastore.predicates.StateStorePredicate;
+import org.apache.gobblin.metastore.predicates.StoreNamePredicate;
+import org.apache.gobblin.runtime.metastore.filesystem.FsDatasetStateStoreEntryManager;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -33,6 +41,8 @@ import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.metastore.FsStateStore;
import org.apache.gobblin.metastore.StateStore;
+import com.google.common.io.Files;
+
/**
* Unit tests for {@link FsDatasetStateStore}.
@@ -171,6 +181,62 @@ public class FsDatasetStateStoreTest {
Assert.assertEquals(datasetState.getDuration(), 1000);
}
+ @Test
+ public void testGetMetadataForTables() throws Exception {
+
+ File tmpDir = Files.createTempDir();
+ tmpDir.deleteOnExit();
+
+ FsDatasetStateStore store = new FsDatasetStateStore(FileSystem.getLocal(new Configuration()), tmpDir.getAbsolutePath());
+
+ JobState.DatasetState dataset2State = new JobState.DatasetState("job1", "job1_id2");
+ dataset2State.setDatasetUrn("dataset2");
+ dataset2State.setId("dataset2");
+ TaskState taskState = new TaskState();
+ taskState.setJobId("job1_id2");
+ taskState.setTaskId("task123");
+ taskState.setProp("key", "value");
+ dataset2State.addTaskState(taskState);
+
+ store.persistDatasetState("dataset1", new JobState.DatasetState("job1", "job1_id1"));
+ store.persistDatasetState("dataset1", new JobState.DatasetState("job1", "job1_id2"));
+ store.persistDatasetState("dataset2", dataset2State);
+ store.persistDatasetState("dataset1", new JobState.DatasetState("job2", "job2_id1"));
+ store.persistDatasetState("", new JobState.DatasetState("job3", "job3_id1"));
+
+ List<FsDatasetStateStoreEntryManager> metadataList = store.getMetadataForTables(new StateStorePredicate(x -> true));
+
+ // 5 explicitly stored states, plus 4 current links, one per job-dataset
+ Assert.assertEquals(metadataList.size(), 9);
+
+ metadataList = store.getMetadataForTables(new StoreNamePredicate("job1", x-> true));
+ // 3 explicitly stored states, plus 2 current links, one per dataset
+ Assert.assertEquals(metadataList.size(), 5);
+
+ metadataList = store.getMetadataForTables(new DatasetPredicate("job1", "dataset1", x -> true));
+ Assert.assertEquals(metadataList.size(), 3);
+
+ metadataList = store.getMetadataForTables(new DatasetPredicate("job1", "dataset2", meta ->
+ ((DatasetStateStoreEntryManager) meta).getStateId().equals(DatasetStateStore.CURRENT_DATASET_STATE_FILE_SUFFIX)
+ ));
+ Assert.assertEquals(metadataList.size(), 1);
+ DatasetStateStoreEntryManager metadata = metadataList.get(0);
+ Assert.assertEquals(metadata.getStoreName(), "job1");
+ Assert.assertEquals(metadata.getSanitizedDatasetUrn(), "dataset2");
+ Assert.assertEquals(metadata.getStateId(), DatasetStateStore.CURRENT_DATASET_STATE_FILE_SUFFIX);
+ Assert.assertEquals(metadata.getDatasetStateStore(), store);
+
+ JobState.DatasetState readState = (JobState.DatasetState) metadata.readState();
+ TaskState readTaskState = readState.getTaskStates().get(0);
+ Assert.assertEquals(readTaskState.getProp("key"), "value");
+ metadata.delete();
+ // verify it got deleted
+ metadataList = store.getMetadataForTables(new DatasetPredicate("job1", "dataset2", meta ->
+ ((DatasetStateStoreEntryManager) meta).getStateId().equals(DatasetStateStore.CURRENT_DATASET_STATE_FILE_SUFFIX)
+ ));
+ Assert.assertTrue(metadataList.isEmpty());
+ }
+
@AfterClass
public void tearDown() throws IOException {
FileSystem fs = FileSystem.getLocal(new Configuration(false));
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/998fe200/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/JobLauncherTestHelper.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/JobLauncherTestHelper.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/JobLauncherTestHelper.java
index 068d357..5a3c631 100644
--- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/JobLauncherTestHelper.java
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/JobLauncherTestHelper.java
@@ -79,7 +79,7 @@ public class JobLauncherTestHelper {
Assert.assertTrue(jobMetricContextTags.contains(ClusterNameTags.CLUSTER_IDENTIFIER_TAG_NAME),
ClusterNameTags.CLUSTER_IDENTIFIER_TAG_NAME + " tag missing in job metric context tags.");
- List<JobState.DatasetState> datasetStateList = this.datasetStateStore.getAll(jobName, jobId + ".jst");
+ List<JobState.DatasetState> datasetStateList = this.datasetStateStore.getAll(jobName, sanitizeJobNameForDatasetStore(jobId) + ".jst");
DatasetState datasetState = datasetStateList.get(0);
Assert.assertEquals(datasetState.getState(), JobState.RunningState.COMMITTED);
@@ -106,7 +106,7 @@ public class JobLauncherTestHelper {
closer.close();
}
- List<JobState.DatasetState> datasetStateList = this.datasetStateStore.getAll(jobName, jobId + ".jst");
+ List<JobState.DatasetState> datasetStateList = this.datasetStateStore.getAll(jobName, sanitizeJobNameForDatasetStore(jobId) + ".jst");
DatasetState datasetState = datasetStateList.get(0);
Assert.assertEquals(datasetState.getState(), JobState.RunningState.COMMITTED);
@@ -151,7 +151,7 @@ public class JobLauncherTestHelper {
closer.close();
}
- List<JobState.DatasetState> datasetStateList = this.datasetStateStore.getAll(jobName, jobId + ".jst");
+ List<JobState.DatasetState> datasetStateList = this.datasetStateStore.getAll(jobName, sanitizeJobNameForDatasetStore(jobId) + ".jst");
Assert.assertTrue(datasetStateList.isEmpty());
}
@@ -164,7 +164,7 @@ public class JobLauncherTestHelper {
jobLauncher.launchJob(null);
}
- List<JobState.DatasetState> datasetStateList = this.datasetStateStore.getAll(jobName, jobId + ".jst");
+ List<JobState.DatasetState> datasetStateList = this.datasetStateStore.getAll(jobName, sanitizeJobNameForDatasetStore(jobId) + ".jst");
DatasetState datasetState = datasetStateList.get(0);
Assert.assertEquals(datasetState.getState(), JobState.RunningState.COMMITTED);
@@ -239,7 +239,7 @@ public class JobLauncherTestHelper {
closer.close();
}
- List<JobState.DatasetState> datasetStateList = this.datasetStateStore.getAll(jobName, jobId + ".jst");
+ List<JobState.DatasetState> datasetStateList = this.datasetStateStore.getAll(jobName, sanitizeJobNameForDatasetStore(jobId) + ".jst");
JobState jobState = datasetStateList.get(0);
Assert.assertEquals(jobState.getState(), JobState.RunningState.COMMITTED);
@@ -355,4 +355,8 @@ public class JobLauncherTestHelper {
return extractor;
}
}
+
+ private String sanitizeJobNameForDatasetStore(String jobId) {
+ return jobId.replaceAll("[-/]", "_");
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/998fe200/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/commit/CommitSequenceTest.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/commit/CommitSequenceTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/commit/CommitSequenceTest.java
index 213f1e4..141b48c 100644
--- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/commit/CommitSequenceTest.java
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/commit/CommitSequenceTest.java
@@ -93,7 +93,7 @@ public class CommitSequenceTest {
Assert.assertTrue(this.fs.exists(new Path(ROOT_DIR, "dir1/file2")));
Assert.assertTrue(this.fs.exists(new Path(ROOT_DIR, "dir2/file1")));
- Assert.assertTrue(this.fs.exists(new Path(ROOT_DIR, "store/job-name/urn-job-id.jst")));
+ Assert.assertTrue(this.fs.exists(new Path(ROOT_DIR, "store/job-name/urn-job_id.jst")));
Assert.assertTrue(this.fs.exists(new Path(ROOT_DIR, "store/job-name/urn-current.jst")));
}