You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by sh...@apache.org on 2017/02/27 09:35:42 UTC

[1/2] apex-malhar git commit: APEXMALHAR-2218: Creation of RegexSplitter operator

Repository: apex-malhar
Updated Branches:
  refs/heads/master ec7b480ac -> 23061c224


APEXMALHAR-2218: Creation of RegexSplitter operator


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/ffba8a22
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/ffba8a22
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/ffba8a22

Branch: refs/heads/master
Commit: ffba8a2258f9ced1eb430f13e8909bbdf1c173c7
Parents: dd5341f
Author: venkateshDT <ve...@datatorrent.com>
Authored: Tue Aug 30 14:59:44 2016 -0700
Committer: venkateshDT <ve...@datatorrent.com>
Committed: Mon Feb 27 01:29:01 2017 -0800

----------------------------------------------------------------------
 .../datatorrent/contrib/parser/RegexParser.java | 234 +++++++++++++++++
 .../contrib/parser/RegexParserTest.java         | 261 +++++++++++++++++++
 .../src/test/resources/RegexSplitterschema.json |  20 ++
 3 files changed, 515 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/ffba8a22/contrib/src/main/java/com/datatorrent/contrib/parser/RegexParser.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/parser/RegexParser.java b/contrib/src/main/java/com/datatorrent/contrib/parser/RegexParser.java
new file mode 100644
index 0000000..a68c928
--- /dev/null
+++ b/contrib/src/main/java/com/datatorrent/contrib/parser/RegexParser.java
@@ -0,0 +1,234 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.parser;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.util.Date;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import javax.validation.constraints.NotNull;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.beanutils.BeanUtils;
+import org.apache.commons.beanutils.ConversionException;
+import org.apache.commons.beanutils.ConvertUtils;
+import org.apache.commons.beanutils.converters.DateConverter;
+import org.apache.commons.beanutils.converters.DateTimeConverter;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.lib.parser.Parser;
+import com.datatorrent.lib.util.KeyValPair;
+
+/**
+ * Operator parses tuple based on regex pattern and populates POJO matching the user defined schema <br>
+ * This operator expects the upstream operator to send every line in the file as a byte array.
+ * splitRegexPattern contains the regex pattern of lines in the file <br>
+ * Schema is specified in a json format as per {@link DelimitedSchema} that
+ * contains field information and constraints for each field.<br>
+ * Schema field names should match with the POJO variable names<br>
+ * Assumption is that each field in the delimited data should map to a simple
+ * java type.<br>
+ * <br>
+ * <b>Properties</b> <br>
+ * <b>splitRegexPattern</b>:Regex pattern as a string<br>
+ *
+ * @displayName RegexParser
+ * @category Parsers
+ * @tags pojo parser regex logs server
+ * @since 3.7.0
+ */
+@InterfaceStability.Evolving
+public class RegexParser extends Parser<byte[], KeyValPair<String, String>>
+{
+  /**
+   * Contents of the schema.Schema is specified in a json format as per
+   * {@link DelimitedSchema}
+   */
+  @NotNull
+  private String schema;
+  /**
+   * Schema is read into this object to access fields
+   */
+  private transient DelimitedSchema delimitedParserSchema;
+  /**
+   * Regex Pattern defined for the tuple
+   */
+  @NotNull
+  private String splitRegexPattern;
+  /**
+   * Pattern to store the compiled regex
+   */
+  private transient Pattern pattern;
+
+  @Override
+  public void setup(Context.OperatorContext context)
+  {
+    delimitedParserSchema = new DelimitedSchema(schema);
+    pattern = Pattern.compile(splitRegexPattern);
+  }
+
+  @Override
+  public void processTuple(byte[] tuple)
+  {
+    if (tuple == null) {
+      if (err.isConnected()) {
+        err.emit(new KeyValPair<String, String>(null, "Blank/null tuple"));
+      }
+      errorTupleCount++;
+      return;
+    }
+    String incomingString = new String(tuple);
+    if (StringUtils.isBlank(incomingString)) {
+      if (err.isConnected()) {
+        err.emit(new KeyValPair<String, String>(incomingString, "Blank tuple"));
+      }
+      errorTupleCount++;
+      return;
+    }
+    try {
+      if (out.isConnected() && clazz != null) {
+        Matcher matcher = pattern.matcher(incomingString);
+        boolean patternMatched = false;
+        Constructor<?> ctor = clazz.getConstructor();
+        Object object = ctor.newInstance();
+
+        if (matcher.find()) {
+          for (int i = 0; i <= matcher.groupCount()-1; i++) {
+            if (delimitedParserSchema.getFields().get(i).getType() == DelimitedSchema.FieldType.DATE) {
+              DateTimeConverter dtConverter = new DateConverter();
+              dtConverter.setPattern((String)delimitedParserSchema.getFields().get(i).getConstraints().get(DelimitedSchema.DATE_FORMAT));
+              ConvertUtils.register(dtConverter, Date.class);
+            }
+            BeanUtils.setProperty(object, delimitedParserSchema.getFields().get(i).getName(), matcher.group(i+1));
+          }
+          patternMatched = true;
+        }
+        if (!patternMatched) {
+          throw new ConversionException("The incoming tuple do not match with the Regex pattern defined.");
+        }
+
+        out.emit(object);
+        emittedObjectCount++;
+      }
+
+    } catch (IllegalAccessException | InvocationTargetException | NoSuchMethodException | InstantiationException | ConversionException e) {
+      if (err.isConnected()) {
+        err.emit(new KeyValPair<String, String>(incomingString, e.getMessage()));
+        logger.debug("Regex Expression : {} Incoming tuple : {}", splitRegexPattern, incomingString);
+      }
+      errorTupleCount++;
+      logger.error("Tuple could not be parsed. Reason {}", e.getMessage());
+    }
+  }
+
+  /**
+   * Set the schema that defines the format of the tuple
+   *
+   * @param schema
+   */
+  public void setSchema(String schema)
+  {
+    this.schema = schema;
+  }
+
+  /**
+   * Set the Regex Pattern expected for the incoming tuple
+   *
+   * @param splitRegexPattern
+   */
+  public void setSplitRegexPattern(String splitRegexPattern)
+  {
+    this.splitRegexPattern = splitRegexPattern;
+  }
+
+  /**
+   * Get the schema value
+   *
+   * @return schema
+   */
+  public String getSchema()
+  {
+    return schema;
+  }
+
+  /**
+   * Get the Regex Pattern value
+   *
+   * @return splitRegexPattern
+   */
+  public String getSplitRegexPattern()
+  {
+    return splitRegexPattern;
+  }
+
+  @Override
+  public Object convert(byte[] tuple)
+  {
+    throw new UnsupportedOperationException("Not supported");
+  }
+
+  @Override
+  public KeyValPair<String, String> processErrorTuple(byte[] input)
+  {
+    throw new UnsupportedOperationException("Not supported");
+  }
+
+  /**
+   * Get errorTupleCount
+   *
+   * @return errorTupleCount
+   */
+  @VisibleForTesting
+  public long getErrorTupleCount()
+  {
+    return errorTupleCount;
+  }
+
+  /**
+   * Get emittedObjectCount
+   *
+   * @return emittedObjectCount
+   */
+  @VisibleForTesting
+  public long getEmittedObjectCount()
+  {
+    return emittedObjectCount;
+  }
+
+  /**
+   * Get incomingTuplesCount
+   *
+   * @return incomingTuplesCount
+   */
+  @VisibleForTesting
+  public long getIncomingTuplesCount()
+  {
+    return incomingTuplesCount;
+  }
+
+  private static final Logger logger = LoggerFactory.getLogger(RegexParser.class);
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/ffba8a22/contrib/src/test/java/com/datatorrent/contrib/parser/RegexParserTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/parser/RegexParserTest.java b/contrib/src/test/java/com/datatorrent/contrib/parser/RegexParserTest.java
new file mode 100644
index 0000000..ee6e029
--- /dev/null
+++ b/contrib/src/test/java/com/datatorrent/contrib/parser/RegexParserTest.java
@@ -0,0 +1,261 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.parser;
+
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+
+import org.codehaus.jettison.json.JSONException;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
+
+import com.datatorrent.lib.appdata.schemas.SchemaUtils;
+import com.datatorrent.lib.testbench.CollectorTestSink;
+import com.datatorrent.lib.util.KeyValPair;
+
+public class RegexParserTest
+{
+  RegexParser regex = new RegexParser();
+  private CollectorTestSink<Object> error = new CollectorTestSink<Object>();
+
+  private CollectorTestSink<Object> pojoPort = new CollectorTestSink<Object>();
+
+  @Rule
+  public Watcher watcher = new Watcher();
+
+  public class Watcher extends TestWatcher
+  {
+    @Override
+    protected void starting(Description description)
+    {
+      super.starting(description);
+      regex.err.setSink(error);
+      regex.out.setSink(pojoPort);
+      regex.setSchema(SchemaUtils.jarResourceFileToString("RegexSplitterschema.json"));
+      regex.setSplitRegexPattern(".+\\[SEQ=\\w+\\]\\s*(\\d+:[\\d\\d:]+)\\s(\\d+)\\s*(.+)");
+      regex.setClazz(ServerLog.class);
+      regex.setup(null);
+    }
+
+    @Override
+    protected void finished(Description description)
+    {
+      super.finished(description);
+    }
+
+  }
+
+  @Test
+  public void TestValidInputCase() throws ParseException
+  {
+    regex.beginWindow(0);
+    String line = "2015-10-01T03:14:49.000-07:00 lvn-d1-dev DevServer[9876]: INFO: [EVENT][SEQ=248717]" +
+      " 2015:10:01:03:14:49 101 sign-in_id=11111@psop.com ip_address=1.1.1.1  service_id=IP1234-NPB12345_00 " +
+      "result=RESULT_SUCCESconsole_id=0000000138e91b4e58236bf32besdafasdfasdfasdfsadf  account_id=11111  platform=pik";
+    regex.in.process(line.getBytes());
+    regex.endWindow();
+    Assert.assertEquals(1, pojoPort.collectedTuples.size());
+    Assert.assertEquals(0, error.collectedTuples.size());
+    Object obj = pojoPort.collectedTuples.get(0);
+    Assert.assertNotNull(obj);
+    Assert.assertEquals(ServerLog.class, obj.getClass());
+    ServerLog pojo = (ServerLog)obj;
+    Assert.assertEquals(101, pojo.getId());
+    SimpleDateFormat sdf = new SimpleDateFormat("yyyy:MM:dd:hh:mm:ss");
+    Date date = sdf.parse("2015:10:01:03:14:49");
+    Assert.assertEquals(date, pojo.getDate());
+    Assert.assertEquals("sign-in_id=11111@psop.com ip_address=1.1.1.1  service_id=IP1234-NPB12345_00"
+      + " result=RESULT_SUCCESconsole_id=0000000138e91b4e58236bf32besdafasdfasdfasdfsadf  account_id=11111  " +
+      "platform=pik", pojo.getMessage());
+    Assert.assertEquals(1, regex.getIncomingTuplesCount());
+    Assert.assertEquals(1, regex.getEmittedObjectCount());
+  }
+
+  @Test
+  public void testEmptyInput() throws JSONException
+  {
+    String tuple = "";
+    regex.beginWindow(0);
+    regex.in.process(tuple.getBytes());
+    regex.endWindow();
+    Assert.assertEquals(0, pojoPort.collectedTuples.size());
+    Assert.assertEquals(1, error.collectedTuples.size());
+    Assert.assertEquals(1, regex.getIncomingTuplesCount());
+    Assert.assertEquals(1, regex.getErrorTupleCount());
+  }
+
+  @Test
+  public void TestInValidDateInputCase() throws ParseException
+  {
+    regex.beginWindow(0);
+    String line = "2015-10-01T03:14:49.000-07:00 lvn-d1-dev DevServer[9876]: INFO: [EVENT][SEQ=248717]"
+      + " qwerty 101 sign-in_id=11111@psop.com ip_address=1.1.1.1  service_id=IP1234-NPB12345_00"
+      + " result=RESULT_SUCCESconsole_id=0000000138e91b4e58236bf32besdafasdfasdfasdfsadf  account_id=11111  platform=pik";
+    regex.in.process(line.getBytes());
+    regex.endWindow();
+    Assert.assertEquals(0, pojoPort.collectedTuples.size());
+    Assert.assertEquals(1, error.collectedTuples.size());
+    KeyValPair<String, String> obj = (KeyValPair<String, String>)error.collectedTuples.get(0);
+    Assert.assertNotNull(obj);
+    Assert.assertEquals("2015-10-01T03:14:49.000-07:00 lvn-d1-dev DevServer[9876]: INFO: [EVENT][SEQ=248717]" +
+        " qwerty 101 sign-in_id=11111@psop.com ip_address=1.1.1.1  service_id=IP1234-NPB12345_00 " +
+        "result=RESULT_SUCCESconsole_id=0000000138e91b4e58236bf32besdafasdfasdfasdfsadf  account_id=11111  platform=pik"
+      , obj.getKey());
+    Assert.assertEquals("The incoming tuple do not match with the Regex pattern defined.", obj.getValue());
+    Assert.assertEquals(1, regex.getIncomingTuplesCount());
+    Assert.assertEquals(0, regex.getEmittedObjectCount());
+    Assert.assertEquals(1, regex.getErrorTupleCount());
+  }
+
+  @Test
+  public void TestInValidIntInputCase() throws ParseException
+  {
+    regex.beginWindow(0);
+    String line = "2015-10-01T03:14:49.000-07:00 lvn-d1-dev DevServer[9876]: INFO: [EVENT][SEQ=248717] " +
+      "2015:10:01:03:14:46 hskhhskfk sign-in_id=11111@psop.com ip_address=1.1.1.1  service_id=IP1234-NPB12345_00 " +
+      "result=RESULT_SUCCESconsole_id=0000000138e91b4e58236bf32besdafasdfasdfasdfsadf  account_id=11111  platform=pik";
+    regex.in.process(line.getBytes());
+    regex.endWindow();
+    KeyValPair<String, String> obj = (KeyValPair<String, String>)error.collectedTuples.get(0);
+    Assert.assertNotNull(obj);
+    Assert.assertEquals("2015-10-01T03:14:49.000-07:00 lvn-d1-dev DevServer[9876]: INFO: [EVENT][SEQ=248717] " +
+        "2015:10:01:03:14:46 hskhhskfk sign-in_id=11111@psop.com ip_address=1.1.1.1  service_id=IP1234-NPB12345_00 " +
+        "result=RESULT_SUCCESconsole_id=0000000138e91b4e58236bf32besdafasdfasdfasdfsadf  account_id=11111  platform=pik",
+      obj.getKey());
+    Assert.assertEquals("The incoming tuple do not match with the Regex pattern defined.", obj.getValue());
+    Assert.assertEquals(0, pojoPort.collectedTuples.size());
+    Assert.assertEquals(1, error.collectedTuples.size());
+    Assert.assertEquals(1, regex.getIncomingTuplesCount());
+    Assert.assertEquals(0, regex.getEmittedObjectCount());
+    Assert.assertEquals(1, regex.getErrorTupleCount());
+  }
+
+  @Test
+  public void testNullInput() throws JSONException
+  {
+    regex.beginWindow(0);
+    regex.in.process(null);
+    regex.endWindow();
+    Assert.assertEquals(0, pojoPort.collectedTuples.size());
+    Assert.assertEquals(1, error.collectedTuples.size());
+    Assert.assertEquals(1, regex.getIncomingTuplesCount());
+    Assert.assertEquals(1, regex.getErrorTupleCount());
+  }
+
+  @Test
+  public void testParserValidInputMetricVerification()
+  {
+    regex.beginWindow(0);
+    Assert.assertEquals(0, regex.getIncomingTuplesCount());
+    Assert.assertEquals(0, regex.getErrorTupleCount());
+    Assert.assertEquals(0, regex.getEmittedObjectCount());
+    String tuple = "2015-10-01T03:14:49.000-07:00 lvn-d1-dev DevServer[9876]: INFO: [EVENT][SEQ=248717] " +
+      "2015:10:01:03:14:49 101 sign-in_id=11111@psop.com ip_address=1.1.1.1  service_id=IP1234-NPB12345_00 " +
+      "result=RESULT_SUCCESconsole_id=0000000138e91b4e58236bf32besdafasdfasdfasdfsadf  account_id=11111  platform=pik";
+    regex.in.process(tuple.getBytes());
+    regex.endWindow();
+    Assert.assertEquals(1, regex.getIncomingTuplesCount());
+    Assert.assertEquals(0, regex.getErrorTupleCount());
+    Assert.assertEquals(1, regex.getEmittedObjectCount());
+  }
+
+  @Test
+  public void testParserInvalidInputMetricVerification()
+  {
+    regex.beginWindow(0);
+    Assert.assertEquals(0, regex.getIncomingTuplesCount());
+    Assert.assertEquals(0, regex.getErrorTupleCount());
+    Assert.assertEquals(0, regex.getEmittedObjectCount());
+    String tuple = "{" + "\"id\": 2" + "}";
+    regex.in.process(tuple.getBytes());
+    regex.endWindow();
+    Assert.assertEquals(1, regex.getIncomingTuplesCount());
+    Assert.assertEquals(1, regex.getErrorTupleCount());
+    Assert.assertEquals(0, regex.getEmittedObjectCount());
+  }
+
+  @Test
+  public void testParserMetricResetVerification()
+  {
+    Assert.assertEquals(0, regex.getIncomingTuplesCount());
+    Assert.assertEquals(0, regex.getErrorTupleCount());
+    Assert.assertEquals(0, regex.getEmittedObjectCount());
+    String tuple = "2015-10-01T03:14:49.000-07:00 lvn-d1-dev DevServer[9876]: INFO: [EVENT][SEQ=248717] " +
+      "2015:10:01:03:14:49 101 sign-in_id=11111@psop.com ip_address=1.1.1.1  service_id=IP1234-NPB12345_00 " +
+      "result=RESULT_SUCCESconsole_id=0000000138e91b4e58236bf32besdafasdfasdfasdfsadf  account_id=11111  platform=pik";
+    regex.beginWindow(0);
+    regex.in.process(tuple.getBytes());
+    regex.endWindow();
+    Assert.assertEquals(1, regex.getIncomingTuplesCount());
+    Assert.assertEquals(0, regex.getErrorTupleCount());
+    Assert.assertEquals(1, regex.getEmittedObjectCount());
+    regex.beginWindow(1);
+    Assert.assertEquals(0, regex.getIncomingTuplesCount());
+    Assert.assertEquals(0, regex.getErrorTupleCount());
+    Assert.assertEquals(0, regex.getEmittedObjectCount());
+    regex.in.process(tuple.getBytes());
+    Assert.assertEquals(1, regex.getIncomingTuplesCount());
+    Assert.assertEquals(0, regex.getErrorTupleCount());
+    Assert.assertEquals(1, regex.getEmittedObjectCount());
+    regex.endWindow();
+  }
+
+  public static class ServerLog
+  {
+    private String message;
+    private Date date;
+    private int id;
+
+    public String getMessage()
+    {
+      return message;
+    }
+
+    public void setMessage(String message)
+    {
+      this.message = message;
+    }
+
+    public int getId()
+    {
+      return id;
+    }
+
+    public void setId(int id)
+    {
+      this.id = id;
+    }
+
+    public Date getDate()
+    {
+      return date;
+    }
+
+    public void setDate(Date date)
+    {
+      this.date = date;
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/ffba8a22/contrib/src/test/resources/RegexSplitterschema.json
----------------------------------------------------------------------
diff --git a/contrib/src/test/resources/RegexSplitterschema.json b/contrib/src/test/resources/RegexSplitterschema.json
new file mode 100644
index 0000000..e62b6c4
--- /dev/null
+++ b/contrib/src/test/resources/RegexSplitterschema.json
@@ -0,0 +1,20 @@
+{
+    "fields": [
+        {
+            "name": "date",
+            "type": "Date",
+            "constraints": {
+                "format": "yyyy:MM:dd:hh:mm:ss"
+            }
+        },
+        {
+            "name": "id",
+            "type": "Integer"
+        },
+        {
+            "name": "message",
+            "type": "String"
+
+        }
+    ]
+}
\ No newline at end of file


[2/2] apex-malhar git commit: Merge branch 'APEXMALHAR-2218-RegexSplitter' of https://github.com/venkateshkottapalli/apex-malhar

Posted by sh...@apache.org.
Merge branch 'APEXMALHAR-2218-RegexSplitter' of https://github.com/venkateshkottapalli/apex-malhar


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/23061c22
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/23061c22
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/23061c22

Branch: refs/heads/master
Commit: 23061c224d432b258580e6ab04ab42c0589da86c
Parents: ec7b480 ffba8a2
Author: shubham <sh...@gmail.com>
Authored: Mon Feb 27 15:03:12 2017 +0530
Committer: shubham <sh...@gmail.com>
Committed: Mon Feb 27 15:03:12 2017 +0530

----------------------------------------------------------------------
 .../datatorrent/contrib/parser/RegexParser.java | 234 +++++++++++++++++
 .../contrib/parser/RegexParserTest.java         | 261 +++++++++++++++++++
 .../src/test/resources/RegexSplitterschema.json |  20 ++
 3 files changed, 515 insertions(+)
----------------------------------------------------------------------