You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by ch...@apache.org on 2017/03/20 08:39:24 UTC
[1/2] apex-malhar git commit: APEXMALHAR-2376 Add Common Log support
in LogParser operator
Repository: apex-malhar
Updated Branches:
refs/heads/master 89b29378e -> d45e8369c
APEXMALHAR-2376 Add Common Log support in LogParser operator
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/f29c7d45
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/f29c7d45
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/f29c7d45
Branch: refs/heads/master
Commit: f29c7d45b92cacda80de8bb218d5db8235fb2d68
Parents: 7f1abca
Author: akshay <ak...@synerzip.com>
Authored: Tue Jan 31 12:36:38 2017 +0530
Committer: akshay <ak...@synerzip.com>
Committed: Wed Mar 15 11:12:30 2017 +0530
----------------------------------------------------------------------
.../malhar/contrib/parser/CommonLogParser.java | 84 +++++++++++++
.../malhar/contrib/parser/log/CommonLog.java | 120 ++++++++++++++++++
.../contrib/parser/CommonLogParserTest.java | 124 +++++++++++++++++++
3 files changed, 328 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f29c7d45/contrib/src/main/java/org/apache/apex/malhar/contrib/parser/CommonLogParser.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/parser/CommonLogParser.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/parser/CommonLogParser.java
new file mode 100644
index 0000000..ec20810
--- /dev/null
+++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/parser/CommonLogParser.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.parser;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.contrib.parser.log.CommonLog;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.datatorrent.api.Context;
+
+/**
+ * Operator that parses a log string tuple against the
+ * a specified json schema and emits POJO on a parsed port and tuples that could not be
+ * parsed on error port.<br>
+ * <b>Properties</b><br>
+ * <b>jsonSchema</b>:schema as a string<br>
+ * <b>pojoClass</b>:Pojo class in case of user specified schema<br>
+ * <b>Ports</b> <br>
+ * <b>in</b>:input tuple as a String. Each tuple represents a log<br>
+ * <b>parsedOutput</b>:tuples that are validated against the specified schema are emitted
+ * as POJO on this port<br>
+ * <b>err</b>:tuples that do not confine to log format are emitted on this port as
+ * KeyValPair<String,String><br>
+ * Key being the tuple and Val being the reason.
+ */
+@InterfaceStability.Evolving
+public class CommonLogParser extends LogParser
+{
+ private static final Logger logger = LoggerFactory.getLogger(CommonLogParser.class);
+
+ private String schema="{\n" +
+ " \"fields\": [{\n" +
+ " \"field\": \"host\",\n" +
+ " \"regex\": \"^([0-9.]+)\"\n" +
+ " }, {\n" +
+ " \"field\": \"rfc931\",\n" +
+ " \"regex\": \"(\\\\S+)\"\n" +
+ " }, {\n" +
+ " \"field\": \"username\",\n" +
+ " \"regex\": \"(\\\\S+)\"\n" +
+ " }, {\n" +
+ " \"field\": \"datetime\",\n" +
+ " \"regex\": \"\\\\[(.*?)\\\\]\"\n" +
+ " },{\n" +
+ " \"field\": \"request\",\n" +
+ " \"regex\": \"\\\"((?:[^\\\"]|\\\")+)\\\"\"\n" +
+ " },{\n" +
+ " \"field\": \"statusCode\",\n" +
+ " \"regex\": \"(\\\\d{3})\"\n" +
+ " },{\n" +
+ " \"field\": \"bytes\",\n" +
+ " \"regex\": \"(\\\\d+|-)\"\n" +
+ " }]\n" +
+ "}";
+
+
+ @Override
+ public void setup(Context.OperatorContext context)
+ {
+ this.setLogFileFormat(schema);
+ super.setup(context);
+ super.setClazz(CommonLog.class);
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f29c7d45/contrib/src/main/java/org/apache/apex/malhar/contrib/parser/log/CommonLog.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/parser/log/CommonLog.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/parser/log/CommonLog.java
new file mode 100644
index 0000000..aa44a76
--- /dev/null
+++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/parser/log/CommonLog.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.parser.log;
+
+import org.apache.apex.malhar.contrib.parser.LogParser;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * This is default log format parser for <a href="https://en.wikipedia.org/wiki/Common_Log_Format">Common log </a>
+ * To use this format with {@link LogParser} operator just mention the property "logFileFormat" as "COMMON"
+ */
+@InterfaceStability.Evolving
+public class CommonLog
+{
+ private String host;
+ private String rfc931;
+ private String username;
+ private String datetime;
+ private String request;
+ private String statusCode;
+ private String bytes;
+
+ @Override
+ public String toString()
+ {
+ return "CommonLog [ Host : " + this.getHost() +
+ ", rfc931 : " + this.getRfc931() +
+ ", userName : " + this.getUsername() +
+ ", dateTime : " + this.getDatetime() +
+ ", request : " + this.getRequest() +
+ ", statusCode : " + this.getStatusCode() +
+ ", bytes : " + this.getBytes() + " ]";
+ }
+
+ public String getHost()
+ {
+ return host;
+ }
+
+ public void setHost(String host)
+ {
+ this.host = host;
+ }
+
+ public String getRfc931()
+ {
+ return rfc931;
+ }
+
+ public void setRfc931(String rfc931)
+ {
+ this.rfc931 = rfc931;
+ }
+
+ public String getUsername()
+ {
+ return username;
+ }
+
+ public void setUsername(String username)
+ {
+ this.username = username;
+ }
+
+ public String getDatetime()
+ {
+ return datetime;
+ }
+
+ public void setDatetime(String datetime)
+ {
+ this.datetime = datetime;
+ }
+
+ public String getRequest()
+ {
+ return request;
+ }
+
+ public void setRequest(String request)
+ {
+ this.request = request;
+ }
+
+ public String getStatusCode()
+ {
+ return statusCode;
+ }
+
+ public void setStatusCode(String statusCode)
+ {
+ this.statusCode = statusCode;
+ }
+
+ public String getBytes()
+ {
+ return bytes;
+ }
+
+ public void setBytes(String bytes)
+ {
+ this.bytes = bytes;
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f29c7d45/contrib/src/test/java/org/apache/apex/malhar/contrib/parser/CommonLogParserTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/org/apache/apex/malhar/contrib/parser/CommonLogParserTest.java b/contrib/src/test/java/org/apache/apex/malhar/contrib/parser/CommonLogParserTest.java
new file mode 100644
index 0000000..ec529fe
--- /dev/null
+++ b/contrib/src/test/java/org/apache/apex/malhar/contrib/parser/CommonLogParserTest.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.parser;
+
+import java.io.IOException;
+
+import org.codehaus.jettison.json.JSONException;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
+
+import org.apache.apex.malhar.contrib.parser.log.CommonLog;
+
+import com.datatorrent.lib.testbench.CollectorTestSink;
+
+public class CommonLogParserTest
+{
+
+ CommonLogParser commonLogParser = new CommonLogParser();
+
+ private CollectorTestSink<Object> error = new CollectorTestSink<Object>();
+
+ private CollectorTestSink<Object> pojoPort = new CollectorTestSink<Object>();
+
+ @Rule
+ public Watcher watcher = new Watcher();
+
+ public class Watcher extends TestWatcher
+ {
+ @Override
+ protected void starting(Description description)
+ {
+ super.starting(description);
+ commonLogParser.err.setSink(error);
+ commonLogParser.parsedOutput.setSink(pojoPort);
+ }
+
+ @Override
+ protected void finished(Description description)
+ {
+ super.finished(description);
+ error.clear();
+ pojoPort.clear();
+ commonLogParser.teardown();
+ }
+ }
+
+ @Test
+ public void TestEmptyInput()
+ {
+ String tuple = "";
+ commonLogParser.setup(null);
+ commonLogParser.beginWindow(0);
+ commonLogParser.in.process(tuple.getBytes());
+ commonLogParser.endWindow();
+ Assert.assertEquals(0, pojoPort.collectedTuples.size());
+ Assert.assertEquals(1, error.collectedTuples.size());
+ }
+
+ @Test
+ public void TestNullInput()
+ {
+ commonLogParser.setup(null);
+ commonLogParser.in.process(null);
+ commonLogParser.endWindow();
+ Assert.assertEquals(0, pojoPort.collectedTuples.size());
+ Assert.assertEquals(1, error.collectedTuples.size());
+ }
+
+ @Test
+ public void TestValidCommonLogInputCase() throws JSONException, IOException
+ {
+
+ commonLogParser.setup(null);
+ commonLogParser.beginWindow(0);
+ String log = "125.125.125.125 - dsmith [10/Oct/1999:21:15:05 +0500] \"GET /index.html HTTP/1.0\" 200 1043";
+ commonLogParser.in.process(log.getBytes());
+ commonLogParser.endWindow();
+ Assert.assertEquals(1, pojoPort.collectedTuples.size());
+ Assert.assertEquals(0, error.collectedTuples.size());
+ Object obj = pojoPort.collectedTuples.get(0);
+ Assert.assertNotNull(obj);
+ Assert.assertEquals(CommonLog.class, obj.getClass());
+ CommonLog pojo = (CommonLog)obj;
+ Assert.assertNotNull(obj);
+ Assert.assertEquals("125.125.125.125", pojo.getHost());
+ Assert.assertEquals("dsmith", pojo.getUsername());
+ Assert.assertEquals("10/Oct/1999:21:15:05 +0500", pojo.getDatetime());
+ Assert.assertEquals("GET /index.html HTTP/1.0", pojo.getRequest());
+ Assert.assertEquals("200", pojo.getStatusCode());
+ Assert.assertEquals("1043", pojo.getBytes());
+ }
+
+ @Test
+ public void TestInvalidCommonLogInput()
+ {
+ String tuple = "127.0.0.1 - dsmith 10/Oct/1999:21:15:05] \"GET /index.html HTTP/1.0\" 200 1043";
+ commonLogParser.setup(null);
+ commonLogParser.beginWindow(0);
+ commonLogParser.in.process(tuple.getBytes());
+ commonLogParser.endWindow();
+ Assert.assertEquals(0, pojoPort.collectedTuples.size());
+ Assert.assertEquals(1, error.collectedTuples.size());
+ }
+
+}
[2/2] apex-malhar git commit: Merge commit 'refs/pull/545/head' of
github.com:apache/apex-malhar
Posted by ch...@apache.org.
Merge commit 'refs/pull/545/head' of github.com:apache/apex-malhar
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/d45e8369
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/d45e8369
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/d45e8369
Branch: refs/heads/master
Commit: d45e8369cda6449ed5da6793ea62a70972c2c71c
Parents: 89b2937 f29c7d4
Author: Chinmay Kolhatkar <ch...@apache.org>
Authored: Mon Mar 20 13:46:34 2017 +0530
Committer: Chinmay Kolhatkar <ch...@apache.org>
Committed: Mon Mar 20 13:46:34 2017 +0530
----------------------------------------------------------------------
.../malhar/contrib/parser/CommonLogParser.java | 84 +++++++++++++
.../malhar/contrib/parser/log/CommonLog.java | 120 ++++++++++++++++++
.../contrib/parser/CommonLogParserTest.java | 124 +++++++++++++++++++
3 files changed, 328 insertions(+)
----------------------------------------------------------------------