You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@gobblin.apache.org by GitBox <gi...@apache.org> on 2021/01/04 21:35:47 UTC

[GitHub] [incubator-gobblin] Will-Lo opened a new pull request #3189: [GOBBLIN-1350] Adds a CLI for handling state store reads and deletes

Will-Lo opened a new pull request #3189:
URL: https://github.com/apache/incubator-gobblin/pull/3189


   Dear Gobblin maintainers,
   
   Please accept this PR. I understand that it will not be reviewed until I have checked off all the steps below!
   
   
   ### JIRA
   - [ ] My PR addresses the following [Gobblin JIRA](https://issues.apache.org/jira/browse/GOBBLIN/) issues and references them in the PR title. For example, "[GOBBLIN-XXX] My Gobblin PR"
       - https://issues.apache.org/jira/browse/GOBBLIN-1350
   
   
   ### Description
   - [x] Here are some details about my PR, including screenshots (if applicable):
    - Old state-store cli tool for viewing jobstates no longer worked with scripts, so refactored state operations into `JobStateStoreCli` class which handles reads/deletes from state stores
    - Implements a bulk delete feature requested by users
   
   ### Tests
   - [x] My PR adds the following unit tests __OR__ does not need testing for this extremely good reason:
   `JobStateStoreCliTest`
   
   ### Commits
   - [x] My commits all reference JIRA issues in their subject lines, and I have squashed multiple commits if they address the same issue. In addition, my commits follow the guidelines from "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)":
       1. Subject is separated from body by a blank line
       2. Subject is limited to 50 characters
       3. Subject does not end with a period
       4. Subject uses the imperative mood ("add", not "adding")
       5. Body wraps at 72 characters
       6. Body explains "what" and "why", not "how"
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-gobblin] arjun4084346 commented on pull request #3189: [GOBBLIN-1350] Adds a CLI for handling state store reads and deletes

Posted by GitBox <gi...@apache.org>.
arjun4084346 commented on pull request #3189:
URL: https://github.com/apache/incubator-gobblin/pull/3189#issuecomment-757330256


   +1 LGTM


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-gobblin] arjun4084346 commented on a change in pull request #3189: [GOBBLIN-1350] Adds a CLI for handling state store reads and deletes

Posted by GitBox <gi...@apache.org>.
arjun4084346 commented on a change in pull request #3189:
URL: https://github.com/apache/incubator-gobblin/pull/3189#discussion_r554226622



##########
File path: gobblin-runtime/src/main/java/org/apache/gobblin/runtime/cli/JobStateStoreCLI.java
##########
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.cli;
+
+import com.typesafe.config.Config;
+import java.io.BufferedReader;
+import java.io.FileInputStream;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.charset.Charset;
+import java.util.Arrays;
+import java.util.Properties;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.DefaultParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.lang.StringUtils;
+import org.apache.gobblin.annotation.Alias;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metastore.FsStateStoreFactory;
+import org.apache.gobblin.metastore.StateStore;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.util.JobStateToJsonConverter;
+import org.apache.gobblin.util.ClassAliasResolver;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.JobConfigurationUtils;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+@Slf4j
+@Alias(value = "job-state-store", description = "View or delete JobState in state store")
+public class JobStateStoreCLI implements CliApplication {
+
+  Option sysConfigOption = Option.builder("sc").argName("system configuration file")
+      .desc("Gobblin system configuration file (required if no state store URL specified)").longOpt("sysconfig").hasArg().build();
+  Option storeUrlOption = Option.builder("u").argName("gobblin state store URL")
+      .desc("Gobblin state store root path URL (required if no sysconfig specified)").longOpt("storeurl").hasArg().build();
+  Option jobNameOption = Option.builder("n").argName("gobblin job name").desc("Gobblin job name (required for reading)").longOpt("name")
+      .hasArg().build();
+  Option jobIdOption =
+      Option.builder("i").argName("gobblin job id").desc("Gobblin job id").longOpt("id").hasArg().build();
+  Option helpOption =
+      Option.builder("h").argName("help").desc("Usage").longOpt("help").hasArg().build();
+  Option deleteOption =
+      Option.builder("d").argName("delete state").desc("Deletes a state from the state store with a job id")
+          .longOpt("delete").hasArg().build();
+
+  // For reading state store in json format
+  Option getAsJsonOption =
+      Option.builder("r").argName("read job state").desc("Converts a job state to json").longOpt("read-job-state").build();
+  Option convertAllOption =
+      Option.builder("a").desc("Whether to convert all past job states of the given job when viewing as json").longOpt("all").build();
+  Option keepConfigOption =
+      Option.builder("kc").desc("Whether to keep all configuration properties when viewing as json").longOpt("keepConfig").build();
+  Option outputToFile =
+      Option.builder("t").argName("output file name").desc("Output file name when viewing as json").longOpt("toFile").hasArg().build();
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(JobStateStoreCLI.class);
+  private StateStore<? extends JobState> jobStateStore;
+
+  CommandLine initializeOptions(String[] args) {
+    Options options = new Options();
+    options.addOption(sysConfigOption);
+    options.addOption(storeUrlOption);
+    options.addOption(jobNameOption);
+    options.addOption(jobIdOption);
+    options.addOption(deleteOption);
+    options.addOption(getAsJsonOption);
+    options.addOption(convertAllOption);
+    options.addOption(keepConfigOption);
+    options.addOption(outputToFile);
+
+    CommandLine cmd = null;
+
+    try {
+      CommandLineParser parser = new DefaultParser();
+      cmd = parser.parse(options, Arrays.copyOfRange(args, 1, args.length));
+    } catch (ParseException pe) {
+      HelpFormatter formatter = new HelpFormatter();
+      formatter.printHelp("JobStateStoreCLI", options);
+      throw new RuntimeException(pe);
+    }
+
+    if (!cmd.hasOption(sysConfigOption.getLongOpt()) && !cmd.hasOption(storeUrlOption.getLongOpt()) ){
+      System.out.println("State store configuration or state store url options missing");
+      HelpFormatter formatter = new HelpFormatter();
+      formatter.printHelp("JobStateStoreCLI", options);
+      return null;
+    }
+
+    if (cmd.hasOption(getAsJsonOption.getOpt()) && !cmd.hasOption(jobNameOption.getOpt())) {
+      System.out.println("Job name option missing for reading job states as json");
+      HelpFormatter formatter = new HelpFormatter();
+      formatter.printHelp("JobStateStoreCLI", options);
+      return null;
+    }
+
+    if (cmd.hasOption(helpOption.getOpt())) {
+      HelpFormatter formatter = new HelpFormatter();
+      formatter.printHelp("JobStateStoreCLI", options);
+      return null;
+    }
+
+    return cmd;
+  }
+
+
+  @Override
+  public void run(String[] args) throws Exception {
+    CommandLine cmd = initializeOptions(args);
+    if (cmd == null) {
+      return; // incorrect args were called
+    }
+
+    Properties props = new Properties();
+
+    if (cmd.hasOption(sysConfigOption.getOpt())) {
+      props = JobConfigurationUtils.fileToProperties(cmd.getOptionValue(sysConfigOption.getOpt()));
+    }
+
+    String storeUrl = cmd.getOptionValue(storeUrlOption.getLongOpt());
+    if (StringUtils.isNotBlank(storeUrl)) {
+      props.setProperty(ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY, storeUrl);
+    }
+
+    Config stateStoreConfig = ConfigUtils.propertiesToConfig(props);
+    ClassAliasResolver<StateStore.Factory> resolver =
+        new ClassAliasResolver<>(StateStore.Factory.class);
+    StateStore.Factory stateStoreFactory;
+
+    try {
+      stateStoreFactory = resolver.resolveClass(ConfigUtils.getString(stateStoreConfig, ConfigurationKeys.STATE_STORE_TYPE_KEY,
+          FsStateStoreFactory.class.getName())).newInstance();
+    } catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) {
+      throw new RuntimeException(e);
+    }
+    this.jobStateStore = stateStoreFactory.createStateStore(stateStoreConfig, JobState.class);
+
+    if (cmd.hasOption(getAsJsonOption.getOpt())) {
+      this.viewStateAsJson(cmd);
+    } else if (cmd.hasOption(deleteOption.getOpt())) {
+      Path filePath = new Path(cmd.getOptionValue(deleteOption.getOpt()));

Review comment:
       let's move the delete part to a separate method.

##########
File path: gobblin-runtime/src/main/java/org/apache/gobblin/runtime/cli/JobStateStoreCLI.java
##########
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.cli;
+
+import com.typesafe.config.Config;
+import java.io.BufferedReader;
+import java.io.FileInputStream;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.charset.Charset;
+import java.util.Arrays;
+import java.util.Properties;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.DefaultParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.lang.StringUtils;
+import org.apache.gobblin.annotation.Alias;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metastore.FsStateStoreFactory;
+import org.apache.gobblin.metastore.StateStore;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.util.JobStateToJsonConverter;
+import org.apache.gobblin.util.ClassAliasResolver;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.JobConfigurationUtils;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+@Slf4j
+@Alias(value = "job-state-store", description = "View or delete JobState in state store")
+public class JobStateStoreCLI implements CliApplication {
+
+  Option sysConfigOption = Option.builder("sc").argName("system configuration file")
+      .desc("Gobblin system configuration file (required if no state store URL specified)").longOpt("sysconfig").hasArg().build();
+  Option storeUrlOption = Option.builder("u").argName("gobblin state store URL")
+      .desc("Gobblin state store root path URL (required if no sysconfig specified)").longOpt("storeurl").hasArg().build();
+  Option jobNameOption = Option.builder("n").argName("gobblin job name").desc("Gobblin job name (required for reading)").longOpt("name")
+      .hasArg().build();
+  Option jobIdOption =
+      Option.builder("i").argName("gobblin job id").desc("Gobblin job id").longOpt("id").hasArg().build();
+  Option helpOption =
+      Option.builder("h").argName("help").desc("Usage").longOpt("help").hasArg().build();
+  Option deleteOption =
+      Option.builder("d").argName("delete state").desc("Deletes a state from the state store with a job id")
+          .longOpt("delete").hasArg().build();
+
+  // For reading state store in json format
+  Option getAsJsonOption =
+      Option.builder("r").argName("read job state").desc("Converts a job state to json").longOpt("read-job-state").build();
+  Option convertAllOption =
+      Option.builder("a").desc("Whether to convert all past job states of the given job when viewing as json").longOpt("all").build();
+  Option keepConfigOption =
+      Option.builder("kc").desc("Whether to keep all configuration properties when viewing as json").longOpt("keepConfig").build();
+  Option outputToFile =
+      Option.builder("t").argName("output file name").desc("Output file name when viewing as json").longOpt("toFile").hasArg().build();
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(JobStateStoreCLI.class);
+  private StateStore<? extends JobState> jobStateStore;
+
+  CommandLine initializeOptions(String[] args) {
+    Options options = new Options();
+    options.addOption(sysConfigOption);
+    options.addOption(storeUrlOption);
+    options.addOption(jobNameOption);
+    options.addOption(jobIdOption);
+    options.addOption(deleteOption);
+    options.addOption(getAsJsonOption);
+    options.addOption(convertAllOption);
+    options.addOption(keepConfigOption);
+    options.addOption(outputToFile);
+
+    CommandLine cmd = null;
+
+    try {
+      CommandLineParser parser = new DefaultParser();
+      cmd = parser.parse(options, Arrays.copyOfRange(args, 1, args.length));
+    } catch (ParseException pe) {
+      HelpFormatter formatter = new HelpFormatter();
+      formatter.printHelp("JobStateStoreCLI", options);
+      throw new RuntimeException(pe);
+    }
+
+    if (!cmd.hasOption(sysConfigOption.getLongOpt()) && !cmd.hasOption(storeUrlOption.getLongOpt()) ){
+      System.out.println("State store configuration or state store url options missing");
+      HelpFormatter formatter = new HelpFormatter();
+      formatter.printHelp("JobStateStoreCLI", options);
+      return null;
+    }
+
+    if (cmd.hasOption(getAsJsonOption.getOpt()) && !cmd.hasOption(jobNameOption.getOpt())) {
+      System.out.println("Job name option missing for reading job states as json");
+      HelpFormatter formatter = new HelpFormatter();
+      formatter.printHelp("JobStateStoreCLI", options);
+      return null;
+    }
+
+    if (cmd.hasOption(helpOption.getOpt())) {
+      HelpFormatter formatter = new HelpFormatter();
+      formatter.printHelp("JobStateStoreCLI", options);
+      return null;
+    }
+
+    return cmd;
+  }
+
+
+  @Override
+  public void run(String[] args) throws Exception {
+    CommandLine cmd = initializeOptions(args);
+    if (cmd == null) {
+      return; // incorrect args were called
+    }
+
+    Properties props = new Properties();
+
+    if (cmd.hasOption(sysConfigOption.getOpt())) {
+      props = JobConfigurationUtils.fileToProperties(cmd.getOptionValue(sysConfigOption.getOpt()));
+    }
+
+    String storeUrl = cmd.getOptionValue(storeUrlOption.getLongOpt());
+    if (StringUtils.isNotBlank(storeUrl)) {
+      props.setProperty(ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY, storeUrl);
+    }
+
+    Config stateStoreConfig = ConfigUtils.propertiesToConfig(props);
+    ClassAliasResolver<StateStore.Factory> resolver =
+        new ClassAliasResolver<>(StateStore.Factory.class);
+    StateStore.Factory stateStoreFactory;
+
+    try {
+      stateStoreFactory = resolver.resolveClass(ConfigUtils.getString(stateStoreConfig, ConfigurationKeys.STATE_STORE_TYPE_KEY,
+          FsStateStoreFactory.class.getName())).newInstance();
+    } catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) {
+      throw new RuntimeException(e);
+    }
+    this.jobStateStore = stateStoreFactory.createStateStore(stateStoreConfig, JobState.class);
+
+    if (cmd.hasOption(getAsJsonOption.getOpt())) {
+      this.viewStateAsJson(cmd);
+    } else if (cmd.hasOption(deleteOption.getOpt())) {
+      Path filePath = new Path(cmd.getOptionValue(deleteOption.getOpt()));
+      try (BufferedReader br = new BufferedReader(new InputStreamReader(
+          new FileInputStream(filePath.toString()), Charset.forName("UTF-8")))) {
+        String jobName;
+        while ((jobName = br.readLine()) != null) {
+          System.out.println("Deleting " + jobName);
+          try {
+            this.jobStateStore.delete(jobName);

Review comment:
       should we also give option to delete based on job id (option -i) ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-gobblin] codecov-io edited a comment on pull request #3189: [GOBBLIN-1350] Adds a CLI for handling state store reads and deletes

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #3189:
URL: https://github.com/apache/incubator-gobblin/pull/3189#issuecomment-757115857


   # [Codecov](https://codecov.io/gh/apache/incubator-gobblin/pull/3189?src=pr&el=h1) Report
   > Merging [#3189](https://codecov.io/gh/apache/incubator-gobblin/pull/3189?src=pr&el=desc) (3ee7c1b) into [master](https://codecov.io/gh/apache/incubator-gobblin/commit/92fe239355e5c0354a95c2a1ff93e7afc2f53cae?el=desc) (92fe239) will **increase** coverage by `0.02%`.
   > The diff coverage is `59.66%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-gobblin/pull/3189/graphs/tree.svg?width=650&height=150&src=pr&token=4MgURJ0bGc)](https://codecov.io/gh/apache/incubator-gobblin/pull/3189?src=pr&el=tree)
   
   ```diff
   @@             Coverage Diff              @@
   ##             master    #3189      +/-   ##
   ============================================
   + Coverage     46.19%   46.21%   +0.02%     
   - Complexity     9721     9778      +57     
   ============================================
     Files          2006     2018      +12     
     Lines         76856    77287     +431     
     Branches       8548     8577      +29     
   ============================================
   + Hits          35500    35720     +220     
   - Misses        38060    38245     +185     
   - Partials       3296     3322      +26     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/incubator-gobblin/pull/3189?src=pr&el=tree) | Coverage Δ | Complexity Δ | |
   |---|---|---|---|
   | [.../gobblin/runtime/util/JobStateToJsonConverter.java](https://codecov.io/gh/apache/incubator-gobblin/pull/3189/diff?src=pr&el=tree#diff-Z29iYmxpbi1ydW50aW1lL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3J1bnRpbWUvdXRpbC9Kb2JTdGF0ZVRvSnNvbkNvbnZlcnRlci5qYXZh) | `32.75% <11.11%> (+12.54%)` | `6.00 <0.00> (ø)` | |
   | [...g/apache/gobblin/runtime/cli/JobStateStoreCLI.java](https://codecov.io/gh/apache/incubator-gobblin/pull/3189/diff?src=pr&el=tree#diff-Z29iYmxpbi1ydW50aW1lL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3J1bnRpbWUvY2xpL0pvYlN0YXRlU3RvcmVDTEkuamF2YQ==) | `63.63% <63.63%> (ø)` | `9.00 <9.00> (?)` | |
   | [...a/org/apache/gobblin/util/limiter/NoopLimiter.java](https://codecov.io/gh/apache/incubator-gobblin/pull/3189/diff?src=pr&el=tree#diff-Z29iYmxpbi11dGlsaXR5L3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3V0aWwvbGltaXRlci9Ob29wTGltaXRlci5qYXZh) | `40.00% <0.00%> (-20.00%)` | `2.00% <0.00%> (-1.00%)` | |
   | [...he/gobblin/source/PartitionAwareFileRetriever.java](https://codecov.io/gh/apache/incubator-gobblin/pull/3189/diff?src=pr&el=tree#diff-Z29iYmxpbi1jb3JlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NvdXJjZS9QYXJ0aXRpb25Bd2FyZUZpbGVSZXRyaWV2ZXIuamF2YQ==) | `48.14% <0.00%> (-7.41%)` | `0.00% <0.00%> (ø%)` | |
   | [...lin/util/filesystem/FileSystemInstrumentation.java](https://codecov.io/gh/apache/incubator-gobblin/pull/3189/diff?src=pr&el=tree#diff-Z29iYmxpbi11dGlsaXR5L3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3V0aWwvZmlsZXN5c3RlbS9GaWxlU3lzdGVtSW5zdHJ1bWVudGF0aW9uLmphdmE=) | `92.85% <0.00%> (-7.15%)` | `3.00% <0.00%> (-1.00%)` | |
   | [...a/org/apache/gobblin/cluster/HelixJobsMapping.java](https://codecov.io/gh/apache/incubator-gobblin/pull/3189/diff?src=pr&el=tree#diff-Z29iYmxpbi1jbHVzdGVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NsdXN0ZXIvSGVsaXhKb2JzTWFwcGluZy5qYXZh) | `80.32% <0.00%> (-4.92%)` | `15.00% <0.00%> (-2.00%)` | |
   | [...source/extractor/hadoop/HadoopFileInputSource.java](https://codecov.io/gh/apache/incubator-gobblin/pull/3189/diff?src=pr&el=tree#diff-Z29iYmxpbi1jb3JlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NvdXJjZS9leHRyYWN0b3IvaGFkb29wL0hhZG9vcEZpbGVJbnB1dFNvdXJjZS5qYXZh) | `59.32% <0.00%> (-4.32%)` | `6.00% <0.00%> (ø%)` | |
   | [...trumented/extractor/InstrumentedExtractorBase.java](https://codecov.io/gh/apache/incubator-gobblin/pull/3189/diff?src=pr&el=tree#diff-Z29iYmxpbi1jb3JlLWJhc2Uvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2dvYmJsaW4vaW5zdHJ1bWVudGVkL2V4dHJhY3Rvci9JbnN0cnVtZW50ZWRFeHRyYWN0b3JCYXNlLmphdmE=) | `46.42% <0.00%> (-1.72%)` | `14.00% <0.00%> (ø%)` | |
   | [...tractor/extract/kafka/KafkaStreamingExtractor.java](https://codecov.io/gh/apache/incubator-gobblin/pull/3189/diff?src=pr&el=tree#diff-Z29iYmxpbi1tb2R1bGVzL2dvYmJsaW4ta2Fma2EtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NvdXJjZS9leHRyYWN0b3IvZXh0cmFjdC9rYWZrYS9LYWZrYVN0cmVhbWluZ0V4dHJhY3Rvci5qYXZh) | `58.82% <0.00%> (-1.67%)` | `20.00% <0.00%> (ø%)` | |
   | [...lin/elasticsearch/writer/FutureCallbackHolder.java](https://codecov.io/gh/apache/incubator-gobblin/pull/3189/diff?src=pr&el=tree#diff-Z29iYmxpbi1tb2R1bGVzL2dvYmJsaW4tZWxhc3RpY3NlYXJjaC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvZ29iYmxpbi9lbGFzdGljc2VhcmNoL3dyaXRlci9GdXR1cmVDYWxsYmFja0hvbGRlci5qYXZh) | `61.42% <0.00%> (-1.43%)` | `4.00% <0.00%> (ø%)` | |
   | ... and [27 more](https://codecov.io/gh/apache/incubator-gobblin/pull/3189/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-gobblin/pull/3189?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/incubator-gobblin/pull/3189?src=pr&el=footer). Last update [92fe239...3ee7c1b](https://codecov.io/gh/apache/incubator-gobblin/pull/3189?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-gobblin] arjun4084346 commented on a change in pull request #3189: [GOBBLIN-1350] Adds a CLI for handling state store reads and deletes

Posted by GitBox <gi...@apache.org>.
arjun4084346 commented on a change in pull request #3189:
URL: https://github.com/apache/incubator-gobblin/pull/3189#discussion_r554226843



##########
File path: gobblin-runtime/src/main/java/org/apache/gobblin/runtime/cli/JobStateStoreCLI.java
##########
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.cli;
+
+import com.typesafe.config.Config;
+import java.io.BufferedReader;
+import java.io.FileInputStream;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.charset.Charset;
+import java.util.Arrays;
+import java.util.Properties;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.DefaultParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.lang.StringUtils;
+import org.apache.gobblin.annotation.Alias;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metastore.FsStateStoreFactory;
+import org.apache.gobblin.metastore.StateStore;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.util.JobStateToJsonConverter;
+import org.apache.gobblin.util.ClassAliasResolver;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.JobConfigurationUtils;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+@Slf4j
+@Alias(value = "job-state-store", description = "View or delete JobState in state store")
+public class JobStateStoreCLI implements CliApplication {
+
+  Option sysConfigOption = Option.builder("sc").argName("system configuration file")
+      .desc("Gobblin system configuration file (required if no state store URL specified)").longOpt("sysconfig").hasArg().build();
+  Option storeUrlOption = Option.builder("u").argName("gobblin state store URL")
+      .desc("Gobblin state store root path URL (required if no sysconfig specified)").longOpt("storeurl").hasArg().build();
+  Option jobNameOption = Option.builder("n").argName("gobblin job name").desc("Gobblin job name (required for reading)").longOpt("name")
+      .hasArg().build();
+  Option jobIdOption =
+      Option.builder("i").argName("gobblin job id").desc("Gobblin job id").longOpt("id").hasArg().build();
+  Option helpOption =
+      Option.builder("h").argName("help").desc("Usage").longOpt("help").hasArg().build();
+  Option deleteOption =
+      Option.builder("d").argName("delete state").desc("Deletes a state from the state store with a job id")
+          .longOpt("delete").hasArg().build();
+
+  // For reading state store in json format
+  Option getAsJsonOption =
+      Option.builder("r").argName("read job state").desc("Converts a job state to json").longOpt("read-job-state").build();
+  Option convertAllOption =
+      Option.builder("a").desc("Whether to convert all past job states of the given job when viewing as json").longOpt("all").build();
+  Option keepConfigOption =
+      Option.builder("kc").desc("Whether to keep all configuration properties when viewing as json").longOpt("keepConfig").build();
+  Option outputToFile =
+      Option.builder("t").argName("output file name").desc("Output file name when viewing as json").longOpt("toFile").hasArg().build();
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(JobStateStoreCLI.class);
+  private StateStore<? extends JobState> jobStateStore;
+
+  CommandLine initializeOptions(String[] args) {
+    Options options = new Options();
+    options.addOption(sysConfigOption);
+    options.addOption(storeUrlOption);
+    options.addOption(jobNameOption);
+    options.addOption(jobIdOption);
+    options.addOption(deleteOption);
+    options.addOption(getAsJsonOption);
+    options.addOption(convertAllOption);
+    options.addOption(keepConfigOption);
+    options.addOption(outputToFile);
+
+    CommandLine cmd = null;
+
+    try {
+      CommandLineParser parser = new DefaultParser();
+      cmd = parser.parse(options, Arrays.copyOfRange(args, 1, args.length));
+    } catch (ParseException pe) {
+      HelpFormatter formatter = new HelpFormatter();
+      formatter.printHelp("JobStateStoreCLI", options);
+      throw new RuntimeException(pe);
+    }
+
+    if (!cmd.hasOption(sysConfigOption.getLongOpt()) && !cmd.hasOption(storeUrlOption.getLongOpt()) ){
+      System.out.println("State store configuration or state store url options missing");
+      HelpFormatter formatter = new HelpFormatter();
+      formatter.printHelp("JobStateStoreCLI", options);
+      return null;
+    }
+
+    if (cmd.hasOption(getAsJsonOption.getOpt()) && !cmd.hasOption(jobNameOption.getOpt())) {
+      System.out.println("Job name option missing for reading job states as json");
+      HelpFormatter formatter = new HelpFormatter();
+      formatter.printHelp("JobStateStoreCLI", options);
+      return null;
+    }
+
+    if (cmd.hasOption(helpOption.getOpt())) {
+      HelpFormatter formatter = new HelpFormatter();
+      formatter.printHelp("JobStateStoreCLI", options);
+      return null;
+    }
+
+    return cmd;
+  }
+
+
+  @Override
+  public void run(String[] args) throws Exception {
+    CommandLine cmd = initializeOptions(args);
+    if (cmd == null) {
+      return; // incorrect args were called
+    }
+
+    Properties props = new Properties();
+
+    if (cmd.hasOption(sysConfigOption.getOpt())) {
+      props = JobConfigurationUtils.fileToProperties(cmd.getOptionValue(sysConfigOption.getOpt()));
+    }
+
+    String storeUrl = cmd.getOptionValue(storeUrlOption.getLongOpt());
+    if (StringUtils.isNotBlank(storeUrl)) {
+      props.setProperty(ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY, storeUrl);
+    }
+
+    Config stateStoreConfig = ConfigUtils.propertiesToConfig(props);
+    ClassAliasResolver<StateStore.Factory> resolver =
+        new ClassAliasResolver<>(StateStore.Factory.class);
+    StateStore.Factory stateStoreFactory;
+
+    try {
+      stateStoreFactory = resolver.resolveClass(ConfigUtils.getString(stateStoreConfig, ConfigurationKeys.STATE_STORE_TYPE_KEY,
+          FsStateStoreFactory.class.getName())).newInstance();
+    } catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) {
+      throw new RuntimeException(e);
+    }
+    this.jobStateStore = stateStoreFactory.createStateStore(stateStoreConfig, JobState.class);
+
+    if (cmd.hasOption(getAsJsonOption.getOpt())) {
+      this.viewStateAsJson(cmd);
+    } else if (cmd.hasOption(deleteOption.getOpt())) {
+      Path filePath = new Path(cmd.getOptionValue(deleteOption.getOpt()));
+      try (BufferedReader br = new BufferedReader(new InputStreamReader(
+          new FileInputStream(filePath.toString()), Charset.forName("UTF-8")))) {
+        String jobName;
+        while ((jobName = br.readLine()) != null) {
+          System.out.println("Deleting " + jobName);
+          try {
+            this.jobStateStore.delete(jobName);

Review comment:
       should we also give option to delete based on job id (option -i) ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-gobblin] codecov-io commented on pull request #3189: [GOBBLIN-1350] Adds a CLI for handling state store reads and deletes

Posted by GitBox <gi...@apache.org>.
codecov-io commented on pull request #3189:
URL: https://github.com/apache/incubator-gobblin/pull/3189#issuecomment-757115857


   # [Codecov](https://codecov.io/gh/apache/incubator-gobblin/pull/3189?src=pr&el=h1) Report
   > Merging [#3189](https://codecov.io/gh/apache/incubator-gobblin/pull/3189?src=pr&el=desc) (3ee7c1b) into [master](https://codecov.io/gh/apache/incubator-gobblin/commit/92fe239355e5c0354a95c2a1ff93e7afc2f53cae?el=desc) (92fe239) will **decrease** coverage by `36.99%`.
   > The diff coverage is `59.66%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-gobblin/pull/3189/graphs/tree.svg?width=650&height=150&src=pr&token=4MgURJ0bGc)](https://codecov.io/gh/apache/incubator-gobblin/pull/3189?src=pr&el=tree)
   
   ```diff
   @@             Coverage Diff              @@
   ##             master   #3189       +/-   ##
   ============================================
   - Coverage     46.19%   9.19%   -37.00%     
   + Complexity     9721    1737     -7984     
   ============================================
     Files          2006    2018       +12     
     Lines         76856   77287      +431     
     Branches       8548    8577       +29     
   ============================================
   - Hits          35500    7107    -28393     
   - Misses        38060   69485    +31425     
   + Partials       3296     695     -2601     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/incubator-gobblin/pull/3189?src=pr&el=tree) | Coverage Δ | Complexity Δ | |
   |---|---|---|---|
   | [.../gobblin/runtime/util/JobStateToJsonConverter.java](https://codecov.io/gh/apache/incubator-gobblin/pull/3189/diff?src=pr&el=tree#diff-Z29iYmxpbi1ydW50aW1lL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3J1bnRpbWUvdXRpbC9Kb2JTdGF0ZVRvSnNvbkNvbnZlcnRlci5qYXZh) | `32.75% <11.11%> (+12.54%)` | `6.00 <0.00> (ø)` | |
   | [...g/apache/gobblin/runtime/cli/JobStateStoreCLI.java](https://codecov.io/gh/apache/incubator-gobblin/pull/3189/diff?src=pr&el=tree#diff-Z29iYmxpbi1ydW50aW1lL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3J1bnRpbWUvY2xpL0pvYlN0YXRlU3RvcmVDTEkuamF2YQ==) | `63.63% <63.63%> (ø)` | `9.00 <9.00> (?)` | |
   | [...c/main/java/org/apache/gobblin/util/FileUtils.java](https://codecov.io/gh/apache/incubator-gobblin/pull/3189/diff?src=pr&el=tree#diff-Z29iYmxpbi11dGlsaXR5L3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3V0aWwvRmlsZVV0aWxzLmphdmE=) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-4.00%)` | |
   | [...n/java/org/apache/gobblin/fork/CopyableSchema.java](https://codecov.io/gh/apache/incubator-gobblin/pull/3189/diff?src=pr&el=tree#diff-Z29iYmxpbi1jb3JlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2ZvcmsvQ29weWFibGVTY2hlbWEuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-2.00%)` | |
   | [...java/org/apache/gobblin/stream/ControlMessage.java](https://codecov.io/gh/apache/incubator-gobblin/pull/3189/diff?src=pr&el=tree#diff-Z29iYmxpbi1hcGkvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2dvYmJsaW4vc3RyZWFtL0NvbnRyb2xNZXNzYWdlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-1.00%)` | |
   | [...va/org/apache/gobblin/dataset/DatasetResolver.java](https://codecov.io/gh/apache/incubator-gobblin/pull/3189/diff?src=pr&el=tree#diff-Z29iYmxpbi1hcGkvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2dvYmJsaW4vZGF0YXNldC9EYXRhc2V0UmVzb2x2ZXIuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-3.00%)` | |
   | [...va/org/apache/gobblin/converter/EmptyIterable.java](https://codecov.io/gh/apache/incubator-gobblin/pull/3189/diff?src=pr&el=tree#diff-Z29iYmxpbi1jb3JlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NvbnZlcnRlci9FbXB0eUl0ZXJhYmxlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-1.00%)` | |
   | [...org/apache/gobblin/ack/BasicAckableForTesting.java](https://codecov.io/gh/apache/incubator-gobblin/pull/3189/diff?src=pr&el=tree#diff-Z29iYmxpbi1hcGkvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2dvYmJsaW4vYWNrL0Jhc2ljQWNrYWJsZUZvclRlc3RpbmcuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-3.00%)` | |
   | [...n/java/org/apache/gobblin/salesforce/SfConfig.java](https://codecov.io/gh/apache/incubator-gobblin/pull/3189/diff?src=pr&el=tree#diff-Z29iYmxpbi1zYWxlc2ZvcmNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NhbGVzZm9yY2UvU2ZDb25maWcuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-1.00%)` | |
   | [.../org/apache/gobblin/yarn/HelixMessageSubTypes.java](https://codecov.io/gh/apache/incubator-gobblin/pull/3189/diff?src=pr&el=tree#diff-Z29iYmxpbi15YXJuL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3lhcm4vSGVsaXhNZXNzYWdlU3ViVHlwZXMuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-1.00%)` | |
   | ... and [1072 more](https://codecov.io/gh/apache/incubator-gobblin/pull/3189/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-gobblin/pull/3189?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/incubator-gobblin/pull/3189?src=pr&el=footer). Last update [92fe239...3ee7c1b](https://codecov.io/gh/apache/incubator-gobblin/pull/3189?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-gobblin] codecov-io edited a comment on pull request #3189: [GOBBLIN-1350] Adds a CLI for handling state store reads and deletes

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #3189:
URL: https://github.com/apache/incubator-gobblin/pull/3189#issuecomment-757115857


   # [Codecov](https://codecov.io/gh/apache/incubator-gobblin/pull/3189?src=pr&el=h1) Report
   > Merging [#3189](https://codecov.io/gh/apache/incubator-gobblin/pull/3189?src=pr&el=desc) (3ee7c1b) into [master](https://codecov.io/gh/apache/incubator-gobblin/commit/92fe239355e5c0354a95c2a1ff93e7afc2f53cae?el=desc) (92fe239) will **increase** coverage by `0.02%`.
   > The diff coverage is `59.66%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-gobblin/pull/3189/graphs/tree.svg?width=650&height=150&src=pr&token=4MgURJ0bGc)](https://codecov.io/gh/apache/incubator-gobblin/pull/3189?src=pr&el=tree)
   
   ```diff
   @@             Coverage Diff              @@
   ##             master    #3189      +/-   ##
   ============================================
   + Coverage     46.19%   46.21%   +0.02%     
   - Complexity     9721     9778      +57     
   ============================================
     Files          2006     2018      +12     
     Lines         76856    77287     +431     
     Branches       8548     8577      +29     
   ============================================
   + Hits          35500    35720     +220     
   - Misses        38060    38245     +185     
   - Partials       3296     3322      +26     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/incubator-gobblin/pull/3189?src=pr&el=tree) | Coverage Δ | Complexity Δ | |
   |---|---|---|---|
   | [.../gobblin/runtime/util/JobStateToJsonConverter.java](https://codecov.io/gh/apache/incubator-gobblin/pull/3189/diff?src=pr&el=tree#diff-Z29iYmxpbi1ydW50aW1lL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3J1bnRpbWUvdXRpbC9Kb2JTdGF0ZVRvSnNvbkNvbnZlcnRlci5qYXZh) | `32.75% <11.11%> (+12.54%)` | `6.00 <0.00> (ø)` | |
   | [...g/apache/gobblin/runtime/cli/JobStateStoreCLI.java](https://codecov.io/gh/apache/incubator-gobblin/pull/3189/diff?src=pr&el=tree#diff-Z29iYmxpbi1ydW50aW1lL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3J1bnRpbWUvY2xpL0pvYlN0YXRlU3RvcmVDTEkuamF2YQ==) | `63.63% <63.63%> (ø)` | `9.00 <9.00> (?)` | |
   | [...a/org/apache/gobblin/util/limiter/NoopLimiter.java](https://codecov.io/gh/apache/incubator-gobblin/pull/3189/diff?src=pr&el=tree#diff-Z29iYmxpbi11dGlsaXR5L3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3V0aWwvbGltaXRlci9Ob29wTGltaXRlci5qYXZh) | `40.00% <0.00%> (-20.00%)` | `2.00% <0.00%> (-1.00%)` | |
   | [...he/gobblin/source/PartitionAwareFileRetriever.java](https://codecov.io/gh/apache/incubator-gobblin/pull/3189/diff?src=pr&el=tree#diff-Z29iYmxpbi1jb3JlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NvdXJjZS9QYXJ0aXRpb25Bd2FyZUZpbGVSZXRyaWV2ZXIuamF2YQ==) | `48.14% <0.00%> (-7.41%)` | `0.00% <0.00%> (ø%)` | |
   | [...lin/util/filesystem/FileSystemInstrumentation.java](https://codecov.io/gh/apache/incubator-gobblin/pull/3189/diff?src=pr&el=tree#diff-Z29iYmxpbi11dGlsaXR5L3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3V0aWwvZmlsZXN5c3RlbS9GaWxlU3lzdGVtSW5zdHJ1bWVudGF0aW9uLmphdmE=) | `92.85% <0.00%> (-7.15%)` | `3.00% <0.00%> (-1.00%)` | |
   | [...a/org/apache/gobblin/cluster/HelixJobsMapping.java](https://codecov.io/gh/apache/incubator-gobblin/pull/3189/diff?src=pr&el=tree#diff-Z29iYmxpbi1jbHVzdGVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NsdXN0ZXIvSGVsaXhKb2JzTWFwcGluZy5qYXZh) | `80.32% <0.00%> (-4.92%)` | `15.00% <0.00%> (-2.00%)` | |
   | [...source/extractor/hadoop/HadoopFileInputSource.java](https://codecov.io/gh/apache/incubator-gobblin/pull/3189/diff?src=pr&el=tree#diff-Z29iYmxpbi1jb3JlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NvdXJjZS9leHRyYWN0b3IvaGFkb29wL0hhZG9vcEZpbGVJbnB1dFNvdXJjZS5qYXZh) | `59.32% <0.00%> (-4.32%)` | `6.00% <0.00%> (ø%)` | |
   | [...trumented/extractor/InstrumentedExtractorBase.java](https://codecov.io/gh/apache/incubator-gobblin/pull/3189/diff?src=pr&el=tree#diff-Z29iYmxpbi1jb3JlLWJhc2Uvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2dvYmJsaW4vaW5zdHJ1bWVudGVkL2V4dHJhY3Rvci9JbnN0cnVtZW50ZWRFeHRyYWN0b3JCYXNlLmphdmE=) | `46.42% <0.00%> (-1.72%)` | `14.00% <0.00%> (ø%)` | |
   | [...tractor/extract/kafka/KafkaStreamingExtractor.java](https://codecov.io/gh/apache/incubator-gobblin/pull/3189/diff?src=pr&el=tree#diff-Z29iYmxpbi1tb2R1bGVzL2dvYmJsaW4ta2Fma2EtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NvdXJjZS9leHRyYWN0b3IvZXh0cmFjdC9rYWZrYS9LYWZrYVN0cmVhbWluZ0V4dHJhY3Rvci5qYXZh) | `58.82% <0.00%> (-1.67%)` | `20.00% <0.00%> (ø%)` | |
   | [...lin/elasticsearch/writer/FutureCallbackHolder.java](https://codecov.io/gh/apache/incubator-gobblin/pull/3189/diff?src=pr&el=tree#diff-Z29iYmxpbi1tb2R1bGVzL2dvYmJsaW4tZWxhc3RpY3NlYXJjaC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvZ29iYmxpbi9lbGFzdGljc2VhcmNoL3dyaXRlci9GdXR1cmVDYWxsYmFja0hvbGRlci5qYXZh) | `61.42% <0.00%> (-1.43%)` | `4.00% <0.00%> (ø%)` | |
   | ... and [27 more](https://codecov.io/gh/apache/incubator-gobblin/pull/3189/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-gobblin/pull/3189?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/incubator-gobblin/pull/3189?src=pr&el=footer). Last update [92fe239...3ee7c1b](https://codecov.io/gh/apache/incubator-gobblin/pull/3189?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-gobblin] codecov-io commented on pull request #3189: [GOBBLIN-1350] Adds a CLI for handling state store reads and deletes

Posted by GitBox <gi...@apache.org>.
codecov-io commented on pull request #3189:
URL: https://github.com/apache/incubator-gobblin/pull/3189#issuecomment-757115857


   # [Codecov](https://codecov.io/gh/apache/incubator-gobblin/pull/3189?src=pr&el=h1) Report
   > Merging [#3189](https://codecov.io/gh/apache/incubator-gobblin/pull/3189?src=pr&el=desc) (3ee7c1b) into [master](https://codecov.io/gh/apache/incubator-gobblin/commit/92fe239355e5c0354a95c2a1ff93e7afc2f53cae?el=desc) (92fe239) will **decrease** coverage by `36.99%`.
   > The diff coverage is `59.66%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-gobblin/pull/3189/graphs/tree.svg?width=650&height=150&src=pr&token=4MgURJ0bGc)](https://codecov.io/gh/apache/incubator-gobblin/pull/3189?src=pr&el=tree)
   
   ```diff
   @@             Coverage Diff              @@
   ##             master   #3189       +/-   ##
   ============================================
   - Coverage     46.19%   9.19%   -37.00%     
   + Complexity     9721    1737     -7984     
   ============================================
     Files          2006    2018       +12     
     Lines         76856   77287      +431     
     Branches       8548    8577       +29     
   ============================================
   - Hits          35500    7107    -28393     
   - Misses        38060   69485    +31425     
   + Partials       3296     695     -2601     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/incubator-gobblin/pull/3189?src=pr&el=tree) | Coverage Δ | Complexity Δ | |
   |---|---|---|---|
   | [.../gobblin/runtime/util/JobStateToJsonConverter.java](https://codecov.io/gh/apache/incubator-gobblin/pull/3189/diff?src=pr&el=tree#diff-Z29iYmxpbi1ydW50aW1lL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3J1bnRpbWUvdXRpbC9Kb2JTdGF0ZVRvSnNvbkNvbnZlcnRlci5qYXZh) | `32.75% <11.11%> (+12.54%)` | `6.00 <0.00> (ø)` | |
   | [...g/apache/gobblin/runtime/cli/JobStateStoreCLI.java](https://codecov.io/gh/apache/incubator-gobblin/pull/3189/diff?src=pr&el=tree#diff-Z29iYmxpbi1ydW50aW1lL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3J1bnRpbWUvY2xpL0pvYlN0YXRlU3RvcmVDTEkuamF2YQ==) | `63.63% <63.63%> (ø)` | `9.00 <9.00> (?)` | |
   | [...c/main/java/org/apache/gobblin/util/FileUtils.java](https://codecov.io/gh/apache/incubator-gobblin/pull/3189/diff?src=pr&el=tree#diff-Z29iYmxpbi11dGlsaXR5L3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3V0aWwvRmlsZVV0aWxzLmphdmE=) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-4.00%)` | |
   | [...n/java/org/apache/gobblin/fork/CopyableSchema.java](https://codecov.io/gh/apache/incubator-gobblin/pull/3189/diff?src=pr&el=tree#diff-Z29iYmxpbi1jb3JlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2ZvcmsvQ29weWFibGVTY2hlbWEuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-2.00%)` | |
   | [...java/org/apache/gobblin/stream/ControlMessage.java](https://codecov.io/gh/apache/incubator-gobblin/pull/3189/diff?src=pr&el=tree#diff-Z29iYmxpbi1hcGkvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2dvYmJsaW4vc3RyZWFtL0NvbnRyb2xNZXNzYWdlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-1.00%)` | |
   | [...va/org/apache/gobblin/dataset/DatasetResolver.java](https://codecov.io/gh/apache/incubator-gobblin/pull/3189/diff?src=pr&el=tree#diff-Z29iYmxpbi1hcGkvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2dvYmJsaW4vZGF0YXNldC9EYXRhc2V0UmVzb2x2ZXIuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-3.00%)` | |
   | [...va/org/apache/gobblin/converter/EmptyIterable.java](https://codecov.io/gh/apache/incubator-gobblin/pull/3189/diff?src=pr&el=tree#diff-Z29iYmxpbi1jb3JlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NvbnZlcnRlci9FbXB0eUl0ZXJhYmxlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-1.00%)` | |
   | [...org/apache/gobblin/ack/BasicAckableForTesting.java](https://codecov.io/gh/apache/incubator-gobblin/pull/3189/diff?src=pr&el=tree#diff-Z29iYmxpbi1hcGkvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2dvYmJsaW4vYWNrL0Jhc2ljQWNrYWJsZUZvclRlc3RpbmcuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-3.00%)` | |
   | [...n/java/org/apache/gobblin/salesforce/SfConfig.java](https://codecov.io/gh/apache/incubator-gobblin/pull/3189/diff?src=pr&el=tree#diff-Z29iYmxpbi1zYWxlc2ZvcmNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NhbGVzZm9yY2UvU2ZDb25maWcuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-1.00%)` | |
   | [.../org/apache/gobblin/yarn/HelixMessageSubTypes.java](https://codecov.io/gh/apache/incubator-gobblin/pull/3189/diff?src=pr&el=tree#diff-Z29iYmxpbi15YXJuL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3lhcm4vSGVsaXhNZXNzYWdlU3ViVHlwZXMuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-1.00%)` | |
   | ... and [1072 more](https://codecov.io/gh/apache/incubator-gobblin/pull/3189/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-gobblin/pull/3189?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/incubator-gobblin/pull/3189?src=pr&el=footer). Last update [92fe239...3ee7c1b](https://codecov.io/gh/apache/incubator-gobblin/pull/3189?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-gobblin] asfgit closed pull request #3189: [GOBBLIN-1350] Adds a CLI for handling state store reads and deletes

Posted by GitBox <gi...@apache.org>.
asfgit closed pull request #3189:
URL: https://github.com/apache/incubator-gobblin/pull/3189


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-gobblin] arjun4084346 commented on a change in pull request #3189: [GOBBLIN-1350] Adds a CLI for handling state store reads and deletes

Posted by GitBox <gi...@apache.org>.
arjun4084346 commented on a change in pull request #3189:
URL: https://github.com/apache/incubator-gobblin/pull/3189#discussion_r554226622



##########
File path: gobblin-runtime/src/main/java/org/apache/gobblin/runtime/cli/JobStateStoreCLI.java
##########
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.cli;
+
+import com.typesafe.config.Config;
+import java.io.BufferedReader;
+import java.io.FileInputStream;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.charset.Charset;
+import java.util.Arrays;
+import java.util.Properties;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.DefaultParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.lang.StringUtils;
+import org.apache.gobblin.annotation.Alias;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metastore.FsStateStoreFactory;
+import org.apache.gobblin.metastore.StateStore;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.util.JobStateToJsonConverter;
+import org.apache.gobblin.util.ClassAliasResolver;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.JobConfigurationUtils;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+@Slf4j
+@Alias(value = "job-state-store", description = "View or delete JobState in state store")
+public class JobStateStoreCLI implements CliApplication {
+
+  Option sysConfigOption = Option.builder("sc").argName("system configuration file")
+      .desc("Gobblin system configuration file (required if no state store URL specified)").longOpt("sysconfig").hasArg().build();
+  Option storeUrlOption = Option.builder("u").argName("gobblin state store URL")
+      .desc("Gobblin state store root path URL (required if no sysconfig specified)").longOpt("storeurl").hasArg().build();
+  Option jobNameOption = Option.builder("n").argName("gobblin job name").desc("Gobblin job name (required for reading)").longOpt("name")
+      .hasArg().build();
+  Option jobIdOption =
+      Option.builder("i").argName("gobblin job id").desc("Gobblin job id").longOpt("id").hasArg().build();
+  Option helpOption =
+      Option.builder("h").argName("help").desc("Usage").longOpt("help").hasArg().build();
+  Option deleteOption =
+      Option.builder("d").argName("delete state").desc("Deletes a state from the state store with a job id")
+          .longOpt("delete").hasArg().build();
+
+  // For reading state store in json format
+  Option getAsJsonOption =
+      Option.builder("r").argName("read job state").desc("Converts a job state to json").longOpt("read-job-state").build();
+  Option convertAllOption =
+      Option.builder("a").desc("Whether to convert all past job states of the given job when viewing as json").longOpt("all").build();
+  Option keepConfigOption =
+      Option.builder("kc").desc("Whether to keep all configuration properties when viewing as json").longOpt("keepConfig").build();
+  Option outputToFile =
+      Option.builder("t").argName("output file name").desc("Output file name when viewing as json").longOpt("toFile").hasArg().build();
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(JobStateStoreCLI.class);
+  private StateStore<? extends JobState> jobStateStore;
+
+  CommandLine initializeOptions(String[] args) {
+    Options options = new Options();
+    options.addOption(sysConfigOption);
+    options.addOption(storeUrlOption);
+    options.addOption(jobNameOption);
+    options.addOption(jobIdOption);
+    options.addOption(deleteOption);
+    options.addOption(getAsJsonOption);
+    options.addOption(convertAllOption);
+    options.addOption(keepConfigOption);
+    options.addOption(outputToFile);
+
+    CommandLine cmd = null;
+
+    try {
+      CommandLineParser parser = new DefaultParser();
+      cmd = parser.parse(options, Arrays.copyOfRange(args, 1, args.length));
+    } catch (ParseException pe) {
+      HelpFormatter formatter = new HelpFormatter();
+      formatter.printHelp("JobStateStoreCLI", options);
+      throw new RuntimeException(pe);
+    }
+
+    if (!cmd.hasOption(sysConfigOption.getLongOpt()) && !cmd.hasOption(storeUrlOption.getLongOpt()) ){
+      System.out.println("State store configuration or state store url options missing");
+      HelpFormatter formatter = new HelpFormatter();
+      formatter.printHelp("JobStateStoreCLI", options);
+      return null;
+    }
+
+    if (cmd.hasOption(getAsJsonOption.getOpt()) && !cmd.hasOption(jobNameOption.getOpt())) {
+      System.out.println("Job name option missing for reading job states as json");
+      HelpFormatter formatter = new HelpFormatter();
+      formatter.printHelp("JobStateStoreCLI", options);
+      return null;
+    }
+
+    if (cmd.hasOption(helpOption.getOpt())) {
+      HelpFormatter formatter = new HelpFormatter();
+      formatter.printHelp("JobStateStoreCLI", options);
+      return null;
+    }
+
+    return cmd;
+  }
+
+
+  @Override
+  public void run(String[] args) throws Exception {
+    CommandLine cmd = initializeOptions(args);
+    if (cmd == null) {
+      return; // incorrect args were called
+    }
+
+    Properties props = new Properties();
+
+    if (cmd.hasOption(sysConfigOption.getOpt())) {
+      props = JobConfigurationUtils.fileToProperties(cmd.getOptionValue(sysConfigOption.getOpt()));
+    }
+
+    String storeUrl = cmd.getOptionValue(storeUrlOption.getLongOpt());
+    if (StringUtils.isNotBlank(storeUrl)) {
+      props.setProperty(ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY, storeUrl);
+    }
+
+    Config stateStoreConfig = ConfigUtils.propertiesToConfig(props);
+    ClassAliasResolver<StateStore.Factory> resolver =
+        new ClassAliasResolver<>(StateStore.Factory.class);
+    StateStore.Factory stateStoreFactory;
+
+    try {
+      stateStoreFactory = resolver.resolveClass(ConfigUtils.getString(stateStoreConfig, ConfigurationKeys.STATE_STORE_TYPE_KEY,
+          FsStateStoreFactory.class.getName())).newInstance();
+    } catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) {
+      throw new RuntimeException(e);
+    }
+    this.jobStateStore = stateStoreFactory.createStateStore(stateStoreConfig, JobState.class);
+
+    if (cmd.hasOption(getAsJsonOption.getOpt())) {
+      this.viewStateAsJson(cmd);
+    } else if (cmd.hasOption(deleteOption.getOpt())) {
+      Path filePath = new Path(cmd.getOptionValue(deleteOption.getOpt()));

Review comment:
       let's move the delete part to a separate method.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-gobblin] arjun4084346 commented on pull request #3189: [GOBBLIN-1350] Adds a CLI for handling state store reads and deletes

Posted by GitBox <gi...@apache.org>.
arjun4084346 commented on pull request #3189:
URL: https://github.com/apache/incubator-gobblin/pull/3189#issuecomment-757029476


   minor comments. LGTM


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-gobblin] arjun4084346 commented on pull request #3189: [GOBBLIN-1350] Adds a CLI for handling state store reads and deletes

Posted by GitBox <gi...@apache.org>.
arjun4084346 commented on pull request #3189:
URL: https://github.com/apache/incubator-gobblin/pull/3189#issuecomment-757029476






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org