You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ce...@apache.org on 2016/09/20 15:20:18 UTC

incubator-metron git commit: METRON-428: Allow a kafka offset to be passed to the ParserTopology CLI closes apache/incubator-metron#258

Repository: incubator-metron
Updated Branches:
  refs/heads/master eb6739399 -> 5843421f5


METRON-428: Allow a kafka offset to be passed to the ParserTopology CLI closes apache/incubator-metron#258


Project: http://git-wip-us.apache.org/repos/asf/incubator-metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-metron/commit/5843421f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-metron/tree/5843421f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-metron/diff/5843421f

Branch: refs/heads/master
Commit: 5843421f5205d007569b274363adc3ee03dc8efe
Parents: eb67393
Author: cstella <ce...@gmail.com>
Authored: Tue Sep 20 11:20:05 2016 -0400
Committer: cstella <ce...@gmail.com>
Committed: Tue Sep 20 11:20:05 2016 -0400

----------------------------------------------------------------------
 .../metron/parsers/topology/ParserTopologyCLI.java  | 10 ++++++++++
 .../parsers/topology/ParserTopologyCLITest.java     | 16 ++++++++++++++++
 2 files changed, 26 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5843421f/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java
index 7bf56e3..0ebec6c 100644
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java
@@ -183,6 +183,13 @@ public class ParserTopologyCLI {
       o.setRequired(false);
       return o;
     })
+    ,KAFKA_OFFSET("koff", code ->
+    {
+      Option o = new Option("koff", "kafka_offset", true, "Kafka offset");
+      o.setArgName("BEGINNING|WHERE_I_LEFT_OFF");
+      o.setRequired(false);
+      return o;
+    })
     ;
     Option option;
     String shortCode;
@@ -285,6 +292,9 @@ public class ParserTopologyCLI {
         spoutConfig = readSpoutConfig(new File(ParserOptions.SPOUT_CONFIG.get(cmd)));
       }
       SpoutConfig.Offset offset = cmd.hasOption("t") ? SpoutConfig.Offset.BEGINNING : SpoutConfig.Offset.WHERE_I_LEFT_OFF;
+      if(cmd.hasOption("koff")) {
+        offset = SpoutConfig.Offset.valueOf(cmd.getOptionValue("koff"));
+      }
       TopologyBuilder builder = ParserTopologyBuilder.build(zookeeperUrl,
               brokerUrl,
               sensorType,

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5843421f/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/topology/ParserTopologyCLITest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/topology/ParserTopologyCLITest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/topology/ParserTopologyCLITest.java
index 5f0548a..2940bfe 100644
--- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/topology/ParserTopologyCLITest.java
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/topology/ParserTopologyCLITest.java
@@ -74,6 +74,22 @@ public class ParserTopologyCLITest {
 
 
   @Test
+  public void testKafkaOffset_happyPath() throws ParseException {
+    kafkaOffset(true);
+    kafkaOffset(false);
+  }
+  public void kafkaOffset(boolean longOpt) throws ParseException {
+    CommandLine cli = new CLIBuilder().with(ParserTopologyCLI.ParserOptions.BROKER_URL, "mybroker")
+                                      .with(ParserTopologyCLI.ParserOptions.ZK_QUORUM, "myzk")
+                                      .with(ParserTopologyCLI.ParserOptions.SENSOR_TYPE, "mysensor")
+                                      .with(ParserTopologyCLI.ParserOptions.KAFKA_OFFSET, "BEGINNING")
+                                      .build(longOpt);
+    Assert.assertEquals("myzk", ParserTopologyCLI.ParserOptions.ZK_QUORUM.get(cli));
+    Assert.assertEquals("mybroker", ParserTopologyCLI.ParserOptions.BROKER_URL.get(cli));
+    Assert.assertEquals("mysensor", ParserTopologyCLI.ParserOptions.SENSOR_TYPE.get(cli));
+    Assert.assertEquals("BEGINNING", ParserTopologyCLI.ParserOptions.KAFKA_OFFSET.get(cli));
+  }
+  @Test
   public void testCLI_happyPath() throws ParseException {
     happyPath(true);
     happyPath(false);