You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by sh...@apache.org on 2017/02/27 09:35:42 UTC
[1/2] apex-malhar git commit: APEXMALHAR-2218: Creation of
RegexSplitter operator
Repository: apex-malhar
Updated Branches:
refs/heads/master ec7b480ac -> 23061c224
APEXMALHAR-2218: Creation of RegexSplitter operator
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/ffba8a22
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/ffba8a22
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/ffba8a22
Branch: refs/heads/master
Commit: ffba8a2258f9ced1eb430f13e8909bbdf1c173c7
Parents: dd5341f
Author: venkateshDT <ve...@datatorrent.com>
Authored: Tue Aug 30 14:59:44 2016 -0700
Committer: venkateshDT <ve...@datatorrent.com>
Committed: Mon Feb 27 01:29:01 2017 -0800
----------------------------------------------------------------------
.../datatorrent/contrib/parser/RegexParser.java | 234 +++++++++++++++++
.../contrib/parser/RegexParserTest.java | 261 +++++++++++++++++++
.../src/test/resources/RegexSplitterschema.json | 20 ++
3 files changed, 515 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/ffba8a22/contrib/src/main/java/com/datatorrent/contrib/parser/RegexParser.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/parser/RegexParser.java b/contrib/src/main/java/com/datatorrent/contrib/parser/RegexParser.java
new file mode 100644
index 0000000..a68c928
--- /dev/null
+++ b/contrib/src/main/java/com/datatorrent/contrib/parser/RegexParser.java
@@ -0,0 +1,234 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.parser;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.util.Date;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import javax.validation.constraints.NotNull;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.beanutils.BeanUtils;
+import org.apache.commons.beanutils.ConversionException;
+import org.apache.commons.beanutils.ConvertUtils;
+import org.apache.commons.beanutils.converters.DateConverter;
+import org.apache.commons.beanutils.converters.DateTimeConverter;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.lib.parser.Parser;
+import com.datatorrent.lib.util.KeyValPair;
+
+/**
+ * Operator parses tuple based on regex pattern and populates POJO matching the user defined schema <br>
+ * This operator expects the upstream operator to send every line in the file as a byte array.
+ * splitRegexPattern contains the regex pattern of lines in the file <br>
+ * Schema is specified in a json format as per {@link DelimitedSchema} that
+ * contains field information and constraints for each field.<br>
+ * Schema field names should match with the POJO variable names<br>
+ * Assumption is that each field in the delimited data should map to a simple
+ * java type.<br>
+ * <br>
+ * <b>Properties</b> <br>
+ * <b>splitRegexPattern</b>:Regex pattern as a string<br>
+ *
+ * @displayName RegexParser
+ * @category Parsers
+ * @tags pojo parser regex logs server
+ * @since 3.7.0
+ */
+@InterfaceStability.Evolving
+public class RegexParser extends Parser<byte[], KeyValPair<String, String>>
+{
+ /**
+ * Contents of the schema.Schema is specified in a json format as per
+ * {@link DelimitedSchema}
+ */
+ @NotNull
+ private String schema;
+ /**
+ * Schema is read into this object to access fields
+ */
+ private transient DelimitedSchema delimitedParserSchema;
+ /**
+ * Regex Pattern defined for the tuple
+ */
+ @NotNull
+ private String splitRegexPattern;
+ /**
+ * Pattern to store the compiled regex
+ */
+ private transient Pattern pattern;
+
+ @Override
+ public void setup(Context.OperatorContext context)
+ {
+ delimitedParserSchema = new DelimitedSchema(schema);
+ pattern = Pattern.compile(splitRegexPattern);
+ }
+
+ @Override
+ public void processTuple(byte[] tuple)
+ {
+ if (tuple == null) {
+ if (err.isConnected()) {
+ err.emit(new KeyValPair<String, String>(null, "Blank/null tuple"));
+ }
+ errorTupleCount++;
+ return;
+ }
+ String incomingString = new String(tuple);
+ if (StringUtils.isBlank(incomingString)) {
+ if (err.isConnected()) {
+ err.emit(new KeyValPair<String, String>(incomingString, "Blank tuple"));
+ }
+ errorTupleCount++;
+ return;
+ }
+ try {
+ if (out.isConnected() && clazz != null) {
+ Matcher matcher = pattern.matcher(incomingString);
+ boolean patternMatched = false;
+ Constructor<?> ctor = clazz.getConstructor();
+ Object object = ctor.newInstance();
+
+ if (matcher.find()) {
+ for (int i = 0; i <= matcher.groupCount()-1; i++) {
+ if (delimitedParserSchema.getFields().get(i).getType() == DelimitedSchema.FieldType.DATE) {
+ DateTimeConverter dtConverter = new DateConverter();
+ dtConverter.setPattern((String)delimitedParserSchema.getFields().get(i).getConstraints().get(DelimitedSchema.DATE_FORMAT));
+ ConvertUtils.register(dtConverter, Date.class);
+ }
+ BeanUtils.setProperty(object, delimitedParserSchema.getFields().get(i).getName(), matcher.group(i+1));
+ }
+ patternMatched = true;
+ }
+ if (!patternMatched) {
+ throw new ConversionException("The incoming tuple do not match with the Regex pattern defined.");
+ }
+
+ out.emit(object);
+ emittedObjectCount++;
+ }
+
+ } catch (IllegalAccessException | InvocationTargetException | NoSuchMethodException | InstantiationException | ConversionException e) {
+ if (err.isConnected()) {
+ err.emit(new KeyValPair<String, String>(incomingString, e.getMessage()));
+ logger.debug("Regex Expression : {} Incoming tuple : {}", splitRegexPattern, incomingString);
+ }
+ errorTupleCount++;
+ logger.error("Tuple could not be parsed. Reason {}", e.getMessage());
+ }
+ }
+
+ /**
+ * Set the schema that defines the format of the tuple
+ *
+ * @param schema
+ */
+ public void setSchema(String schema)
+ {
+ this.schema = schema;
+ }
+
+ /**
+ * Set the Regex Pattern expected for the incoming tuple
+ *
+ * @param splitRegexPattern
+ */
+ public void setSplitRegexPattern(String splitRegexPattern)
+ {
+ this.splitRegexPattern = splitRegexPattern;
+ }
+
+ /**
+ * Get the schema value
+ *
+ * @return schema
+ */
+ public String getSchema()
+ {
+ return schema;
+ }
+
+ /**
+ * Get the Regex Pattern value
+ *
+ * @return splitRegexPattern
+ */
+ public String getSplitRegexPattern()
+ {
+ return splitRegexPattern;
+ }
+
+ @Override
+ public Object convert(byte[] tuple)
+ {
+ throw new UnsupportedOperationException("Not supported");
+ }
+
+ @Override
+ public KeyValPair<String, String> processErrorTuple(byte[] input)
+ {
+ throw new UnsupportedOperationException("Not supported");
+ }
+
+ /**
+ * Get errorTupleCount
+ *
+ * @return errorTupleCount
+ */
+ @VisibleForTesting
+ public long getErrorTupleCount()
+ {
+ return errorTupleCount;
+ }
+
+ /**
+ * Get emittedObjectCount
+ *
+ * @return emittedObjectCount
+ */
+ @VisibleForTesting
+ public long getEmittedObjectCount()
+ {
+ return emittedObjectCount;
+ }
+
+ /**
+ * Get incomingTuplesCount
+ *
+ * @return incomingTuplesCount
+ */
+ @VisibleForTesting
+ public long getIncomingTuplesCount()
+ {
+ return incomingTuplesCount;
+ }
+
+ private static final Logger logger = LoggerFactory.getLogger(RegexParser.class);
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/ffba8a22/contrib/src/test/java/com/datatorrent/contrib/parser/RegexParserTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/parser/RegexParserTest.java b/contrib/src/test/java/com/datatorrent/contrib/parser/RegexParserTest.java
new file mode 100644
index 0000000..ee6e029
--- /dev/null
+++ b/contrib/src/test/java/com/datatorrent/contrib/parser/RegexParserTest.java
@@ -0,0 +1,261 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.parser;
+
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+
+import org.codehaus.jettison.json.JSONException;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
+
+import com.datatorrent.lib.appdata.schemas.SchemaUtils;
+import com.datatorrent.lib.testbench.CollectorTestSink;
+import com.datatorrent.lib.util.KeyValPair;
+
+public class RegexParserTest
+{
+ RegexParser regex = new RegexParser();
+ private CollectorTestSink<Object> error = new CollectorTestSink<Object>();
+
+ private CollectorTestSink<Object> pojoPort = new CollectorTestSink<Object>();
+
+ @Rule
+ public Watcher watcher = new Watcher();
+
+ public class Watcher extends TestWatcher
+ {
+ @Override
+ protected void starting(Description description)
+ {
+ super.starting(description);
+ regex.err.setSink(error);
+ regex.out.setSink(pojoPort);
+ regex.setSchema(SchemaUtils.jarResourceFileToString("RegexSplitterschema.json"));
+ regex.setSplitRegexPattern(".+\\[SEQ=\\w+\\]\\s*(\\d+:[\\d\\d:]+)\\s(\\d+)\\s*(.+)");
+ regex.setClazz(ServerLog.class);
+ regex.setup(null);
+ }
+
+ @Override
+ protected void finished(Description description)
+ {
+ super.finished(description);
+ }
+
+ }
+
+ @Test
+ public void TestValidInputCase() throws ParseException
+ {
+ regex.beginWindow(0);
+ String line = "2015-10-01T03:14:49.000-07:00 lvn-d1-dev DevServer[9876]: INFO: [EVENT][SEQ=248717]" +
+ " 2015:10:01:03:14:49 101 sign-in_id=11111@psop.com ip_address=1.1.1.1 service_id=IP1234-NPB12345_00 " +
+ "result=RESULT_SUCCESconsole_id=0000000138e91b4e58236bf32besdafasdfasdfasdfsadf account_id=11111 platform=pik";
+ regex.in.process(line.getBytes());
+ regex.endWindow();
+ Assert.assertEquals(1, pojoPort.collectedTuples.size());
+ Assert.assertEquals(0, error.collectedTuples.size());
+ Object obj = pojoPort.collectedTuples.get(0);
+ Assert.assertNotNull(obj);
+ Assert.assertEquals(ServerLog.class, obj.getClass());
+ ServerLog pojo = (ServerLog)obj;
+ Assert.assertEquals(101, pojo.getId());
+ SimpleDateFormat sdf = new SimpleDateFormat("yyyy:MM:dd:hh:mm:ss");
+ Date date = sdf.parse("2015:10:01:03:14:49");
+ Assert.assertEquals(date, pojo.getDate());
+ Assert.assertEquals("sign-in_id=11111@psop.com ip_address=1.1.1.1 service_id=IP1234-NPB12345_00"
+ + " result=RESULT_SUCCESconsole_id=0000000138e91b4e58236bf32besdafasdfasdfasdfsadf account_id=11111 " +
+ "platform=pik", pojo.getMessage());
+ Assert.assertEquals(1, regex.getIncomingTuplesCount());
+ Assert.assertEquals(1, regex.getEmittedObjectCount());
+ }
+
+ @Test
+ public void testEmptyInput() throws JSONException
+ {
+ String tuple = "";
+ regex.beginWindow(0);
+ regex.in.process(tuple.getBytes());
+ regex.endWindow();
+ Assert.assertEquals(0, pojoPort.collectedTuples.size());
+ Assert.assertEquals(1, error.collectedTuples.size());
+ Assert.assertEquals(1, regex.getIncomingTuplesCount());
+ Assert.assertEquals(1, regex.getErrorTupleCount());
+ }
+
+ @Test
+ public void TestInValidDateInputCase() throws ParseException
+ {
+ regex.beginWindow(0);
+ String line = "2015-10-01T03:14:49.000-07:00 lvn-d1-dev DevServer[9876]: INFO: [EVENT][SEQ=248717]"
+ + " qwerty 101 sign-in_id=11111@psop.com ip_address=1.1.1.1 service_id=IP1234-NPB12345_00"
+ + " result=RESULT_SUCCESconsole_id=0000000138e91b4e58236bf32besdafasdfasdfasdfsadf account_id=11111 platform=pik";
+ regex.in.process(line.getBytes());
+ regex.endWindow();
+ Assert.assertEquals(0, pojoPort.collectedTuples.size());
+ Assert.assertEquals(1, error.collectedTuples.size());
+ KeyValPair<String, String> obj = (KeyValPair<String, String>)error.collectedTuples.get(0);
+ Assert.assertNotNull(obj);
+ Assert.assertEquals("2015-10-01T03:14:49.000-07:00 lvn-d1-dev DevServer[9876]: INFO: [EVENT][SEQ=248717]" +
+ " qwerty 101 sign-in_id=11111@psop.com ip_address=1.1.1.1 service_id=IP1234-NPB12345_00 " +
+ "result=RESULT_SUCCESconsole_id=0000000138e91b4e58236bf32besdafasdfasdfasdfsadf account_id=11111 platform=pik"
+ , obj.getKey());
+ Assert.assertEquals("The incoming tuple do not match with the Regex pattern defined.", obj.getValue());
+ Assert.assertEquals(1, regex.getIncomingTuplesCount());
+ Assert.assertEquals(0, regex.getEmittedObjectCount());
+ Assert.assertEquals(1, regex.getErrorTupleCount());
+ }
+
+ @Test
+ public void TestInValidIntInputCase() throws ParseException
+ {
+ regex.beginWindow(0);
+ String line = "2015-10-01T03:14:49.000-07:00 lvn-d1-dev DevServer[9876]: INFO: [EVENT][SEQ=248717] " +
+ "2015:10:01:03:14:46 hskhhskfk sign-in_id=11111@psop.com ip_address=1.1.1.1 service_id=IP1234-NPB12345_00 " +
+ "result=RESULT_SUCCESconsole_id=0000000138e91b4e58236bf32besdafasdfasdfasdfsadf account_id=11111 platform=pik";
+ regex.in.process(line.getBytes());
+ regex.endWindow();
+ KeyValPair<String, String> obj = (KeyValPair<String, String>)error.collectedTuples.get(0);
+ Assert.assertNotNull(obj);
+ Assert.assertEquals("2015-10-01T03:14:49.000-07:00 lvn-d1-dev DevServer[9876]: INFO: [EVENT][SEQ=248717] " +
+ "2015:10:01:03:14:46 hskhhskfk sign-in_id=11111@psop.com ip_address=1.1.1.1 service_id=IP1234-NPB12345_00 " +
+ "result=RESULT_SUCCESconsole_id=0000000138e91b4e58236bf32besdafasdfasdfasdfsadf account_id=11111 platform=pik",
+ obj.getKey());
+ Assert.assertEquals("The incoming tuple do not match with the Regex pattern defined.", obj.getValue());
+ Assert.assertEquals(0, pojoPort.collectedTuples.size());
+ Assert.assertEquals(1, error.collectedTuples.size());
+ Assert.assertEquals(1, regex.getIncomingTuplesCount());
+ Assert.assertEquals(0, regex.getEmittedObjectCount());
+ Assert.assertEquals(1, regex.getErrorTupleCount());
+ }
+
+ @Test
+ public void testNullInput() throws JSONException
+ {
+ regex.beginWindow(0);
+ regex.in.process(null);
+ regex.endWindow();
+ Assert.assertEquals(0, pojoPort.collectedTuples.size());
+ Assert.assertEquals(1, error.collectedTuples.size());
+ Assert.assertEquals(1, regex.getIncomingTuplesCount());
+ Assert.assertEquals(1, regex.getErrorTupleCount());
+ }
+
+ @Test
+ public void testParserValidInputMetricVerification()
+ {
+ regex.beginWindow(0);
+ Assert.assertEquals(0, regex.getIncomingTuplesCount());
+ Assert.assertEquals(0, regex.getErrorTupleCount());
+ Assert.assertEquals(0, regex.getEmittedObjectCount());
+ String tuple = "2015-10-01T03:14:49.000-07:00 lvn-d1-dev DevServer[9876]: INFO: [EVENT][SEQ=248717] " +
+ "2015:10:01:03:14:49 101 sign-in_id=11111@psop.com ip_address=1.1.1.1 service_id=IP1234-NPB12345_00 " +
+ "result=RESULT_SUCCESconsole_id=0000000138e91b4e58236bf32besdafasdfasdfasdfsadf account_id=11111 platform=pik";
+ regex.in.process(tuple.getBytes());
+ regex.endWindow();
+ Assert.assertEquals(1, regex.getIncomingTuplesCount());
+ Assert.assertEquals(0, regex.getErrorTupleCount());
+ Assert.assertEquals(1, regex.getEmittedObjectCount());
+ }
+
+ @Test
+ public void testParserInvalidInputMetricVerification()
+ {
+ regex.beginWindow(0);
+ Assert.assertEquals(0, regex.getIncomingTuplesCount());
+ Assert.assertEquals(0, regex.getErrorTupleCount());
+ Assert.assertEquals(0, regex.getEmittedObjectCount());
+ String tuple = "{" + "\"id\": 2" + "}";
+ regex.in.process(tuple.getBytes());
+ regex.endWindow();
+ Assert.assertEquals(1, regex.getIncomingTuplesCount());
+ Assert.assertEquals(1, regex.getErrorTupleCount());
+ Assert.assertEquals(0, regex.getEmittedObjectCount());
+ }
+
+ @Test
+ public void testParserMetricResetVerification()
+ {
+ Assert.assertEquals(0, regex.getIncomingTuplesCount());
+ Assert.assertEquals(0, regex.getErrorTupleCount());
+ Assert.assertEquals(0, regex.getEmittedObjectCount());
+ String tuple = "2015-10-01T03:14:49.000-07:00 lvn-d1-dev DevServer[9876]: INFO: [EVENT][SEQ=248717] " +
+ "2015:10:01:03:14:49 101 sign-in_id=11111@psop.com ip_address=1.1.1.1 service_id=IP1234-NPB12345_00 " +
+ "result=RESULT_SUCCESconsole_id=0000000138e91b4e58236bf32besdafasdfasdfasdfsadf account_id=11111 platform=pik";
+ regex.beginWindow(0);
+ regex.in.process(tuple.getBytes());
+ regex.endWindow();
+ Assert.assertEquals(1, regex.getIncomingTuplesCount());
+ Assert.assertEquals(0, regex.getErrorTupleCount());
+ Assert.assertEquals(1, regex.getEmittedObjectCount());
+ regex.beginWindow(1);
+ Assert.assertEquals(0, regex.getIncomingTuplesCount());
+ Assert.assertEquals(0, regex.getErrorTupleCount());
+ Assert.assertEquals(0, regex.getEmittedObjectCount());
+ regex.in.process(tuple.getBytes());
+ Assert.assertEquals(1, regex.getIncomingTuplesCount());
+ Assert.assertEquals(0, regex.getErrorTupleCount());
+ Assert.assertEquals(1, regex.getEmittedObjectCount());
+ regex.endWindow();
+ }
+
+ public static class ServerLog
+ {
+ private String message;
+ private Date date;
+ private int id;
+
+ public String getMessage()
+ {
+ return message;
+ }
+
+ public void setMessage(String message)
+ {
+ this.message = message;
+ }
+
+ public int getId()
+ {
+ return id;
+ }
+
+ public void setId(int id)
+ {
+ this.id = id;
+ }
+
+ public Date getDate()
+ {
+ return date;
+ }
+
+ public void setDate(Date date)
+ {
+ this.date = date;
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/ffba8a22/contrib/src/test/resources/RegexSplitterschema.json
----------------------------------------------------------------------
diff --git a/contrib/src/test/resources/RegexSplitterschema.json b/contrib/src/test/resources/RegexSplitterschema.json
new file mode 100644
index 0000000..e62b6c4
--- /dev/null
+++ b/contrib/src/test/resources/RegexSplitterschema.json
@@ -0,0 +1,20 @@
+{
+ "fields": [
+ {
+ "name": "date",
+ "type": "Date",
+ "constraints": {
+ "format": "yyyy:MM:dd:hh:mm:ss"
+ }
+ },
+ {
+ "name": "id",
+ "type": "Integer"
+ },
+ {
+ "name": "message",
+ "type": "String"
+
+ }
+ ]
+}
\ No newline at end of file
[2/2] apex-malhar git commit: Merge branch
'APEXMALHAR-2218-RegexSplitter' of
https://github.com/venkateshkottapalli/apex-malhar
Posted by sh...@apache.org.
Merge branch 'APEXMALHAR-2218-RegexSplitter' of https://github.com/venkateshkottapalli/apex-malhar
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/23061c22
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/23061c22
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/23061c22
Branch: refs/heads/master
Commit: 23061c224d432b258580e6ab04ab42c0589da86c
Parents: ec7b480 ffba8a2
Author: shubham <sh...@gmail.com>
Authored: Mon Feb 27 15:03:12 2017 +0530
Committer: shubham <sh...@gmail.com>
Committed: Mon Feb 27 15:03:12 2017 +0530
----------------------------------------------------------------------
.../datatorrent/contrib/parser/RegexParser.java | 234 +++++++++++++++++
.../contrib/parser/RegexParserTest.java | 261 +++++++++++++++++++
.../src/test/resources/RegexSplitterschema.json | 20 ++
3 files changed, 515 insertions(+)
----------------------------------------------------------------------