You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by tu...@apache.org on 2017/01/13 10:47:26 UTC
[1/2] apex-malhar git commit: APEXMALHAR-2377-Move LopParser to
org.apache.apex.malhar.contrib.parser package
Repository: apex-malhar
Updated Branches:
refs/heads/master ca6995ca4 -> 52510b0f8
APEXMALHAR-2377-Move LopParser to org.apache.apex.malhar.contrib.parser package
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/12c4bb11
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/12c4bb11
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/12c4bb11
Branch: refs/heads/master
Commit: 12c4bb11af9854c295668ec759dcb7dd49a70838
Parents: 113978f
Author: jogshraddha <jo...@gmail.com>
Authored: Mon Dec 26 18:18:24 2016 +0530
Committer: jogshraddha <jo...@gmail.com>
Committed: Mon Dec 26 18:19:45 2016 +0530
----------------------------------------------------------------------
.../datatorrent/contrib/parser/LogParser.java | 235 -------------------
.../contrib/parser/LogSchemaDetails.java | 234 ------------------
.../apex/malhar/contrib/parser/LogParser.java | 235 +++++++++++++++++++
.../malhar/contrib/parser/LogSchemaDetails.java | 234 ++++++++++++++++++
.../contrib/parser/LogParserTest.java | 168 -------------
.../malhar/contrib/parser/LogParserTest.java | 171 ++++++++++++++
6 files changed, 640 insertions(+), 637 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/12c4bb11/contrib/src/main/java/com/datatorrent/contrib/parser/LogParser.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/parser/LogParser.java b/contrib/src/main/java/com/datatorrent/contrib/parser/LogParser.java
deleted file mode 100644
index a1f1290..0000000
--- a/contrib/src/main/java/com/datatorrent/contrib/parser/LogParser.java
+++ /dev/null
@@ -1,235 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.contrib.parser;
-
-import java.io.IOException;
-import javax.validation.constraints.NotNull;
-import org.codehaus.jettison.json.JSONException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.commons.lang3.CharEncoding;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.classification.InterfaceStability;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.datatorrent.api.AutoMetric;
-import com.datatorrent.api.Context;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.lib.parser.Parser;
-import com.datatorrent.lib.util.KeyValPair;
-
-/**
- * Operator that parses a log string tuple against the
- * a specified json schema and emits POJO on a parsed port and tuples that could not be
- * parsed on error port.<br>
- * <b>Properties</b><br>
- * <b>jsonSchema</b>:schema as a string<br>
- * <b>clazz</b>:Pojo class in case of user specified schema<br>
- * <b>Ports</b> <br>
- * <b>in</b>:input tuple as a String. Each tuple represents a log<br>
- * <b>parsedOutput</b>:tuples that are validated against the specified schema are emitted
- * as POJO on this port<br>
- * <b>err</b>:tuples that do not confine to log format are emitted on this port as
- * KeyValPair<String,String><br>
- * Key being the tuple and Val being the reason.
- */
-@InterfaceStability.Unstable
-public class LogParser extends Parser<byte[], KeyValPair<String, String>>
-{
- private transient Class<?> clazz;
-
- @NotNull
- private String logFileFormat;
-
- private String encoding;
-
- private LogSchemaDetails logSchemaDetails;
-
- private transient ObjectMapper objMapper;
-
- @Override
- public Object convert(byte[] tuple)
- {
- throw new UnsupportedOperationException("Not supported");
- }
-
- @Override
- public KeyValPair<String, String> processErrorTuple(byte[] bytes)
- {
- return null;
- }
-
- /**
- * output port to emit valid records as POJO
- */
- public transient DefaultOutputPort<Object> parsedOutput = new DefaultOutputPort<Object>()
- {
- public void setup(Context.PortContext context)
- {
- clazz = context.getValue(Context.PortContext.TUPLE_CLASS);
- }
- };
-
- /**
- * metric to keep count of number of tuples emitted on {@link #parsedOutput}
- * port
- */
- @AutoMetric
- long parsedOutputCount;
-
- @Override
- public void beginWindow(long windowId)
- {
- super.beginWindow(windowId);
- parsedOutputCount = 0;
- }
-
- @Override
- public void setup(Context.OperatorContext context)
- {
- objMapper = new ObjectMapper();
- encoding = encoding != null ? encoding : CharEncoding.UTF_8;
- setupLog();
- }
-
- @Override
- public void processTuple(byte[] inputTuple)
- {
- if (inputTuple == null) {
- this.emitError(null, "null tuple");
- return;
- }
- String incomingString = "";
- try {
- incomingString = new String(inputTuple, encoding);
- if (StringUtils.isBlank(incomingString)) {
- this.emitError(incomingString, "Blank tuple");
- return;
- }
- logger.debug("Input string {} ", incomingString);
- logger.debug("Parsing with log format {}", this.geLogFileFormat());
- if (this.logSchemaDetails != null && clazz != null) {
- if (parsedOutput.isConnected()) {
- parsedOutput.emit(objMapper.readValue(this.logSchemaDetails.createJsonFromLog(incomingString).toString().getBytes(), clazz));
- parsedOutputCount++;
- }
- }
- } catch (NullPointerException | IOException | JSONException e) {
- this.emitError(incomingString, e.getMessage());
- logger.error("Failed to parse log tuple {}, Exception = {} ", inputTuple, e);
- }
- }
-
- /**
- * Emits error on error port
- * @param tuple
- * @param errorMsg
- */
- public void emitError(String tuple, String errorMsg)
- {
- if (err.isConnected()) {
- err.emit(new KeyValPair<String, String>(tuple, errorMsg));
- }
- errorTupleCount++;
- }
-
- /**
- * Setup for the logs according to the logFileFormat
- */
- public void setupLog()
- {
- try {
- //parse the schema from logFileFormat string
- this.logSchemaDetails = new LogSchemaDetails(logFileFormat);
- } catch (IllegalArgumentException e) {
- logger.error("Error while initializing the custom log format " + e.getMessage());
- }
- }
-
- /**
- * Set log file format required for parsing the log
- * @param logFileFormat
- */
- public void setLogFileFormat(String logFileFormat)
- {
- this.logFileFormat = logFileFormat;
- }
-
- /**
- * Get log file format required for parsing the log
- * @return logFileFormat
- */
- public String geLogFileFormat()
- {
- return logFileFormat;
- }
-
- /**
- * Get encoding parameter for converting tuple into String
- * @return logSchemaDetails
- */
- public String getEncoding()
- {
- return encoding;
- }
-
- /**
- * Set encoding parameter for converting tuple into String
- * @param encoding
- */
- public void setEncoding(String encoding)
- {
- this.encoding = encoding;
- }
-
- /**
- * Get log schema details (field, regex etc)
- * @return logSchemaDetails
- */
- public LogSchemaDetails getLogSchemaDetails() {
- return logSchemaDetails;
- }
-
- /**
- * Set log schema details like (fields and regex)
- * @param logSchemaDetails
- */
- public void setLogSchemaDetails(LogSchemaDetails logSchemaDetails) {
- this.logSchemaDetails = logSchemaDetails;
- }
-
- /**
- * Get the class that needs to be formatted
- * @return Class<?>
- */
- public Class<?> getClazz()
- {
- return clazz;
- }
-
- /**
- * Set the class of tuple that needs to be formatted
- * @param clazz
- */
- public void setClazz(Class<?> clazz)
- {
- this.clazz = clazz;
- }
-
- private static final Logger logger = LoggerFactory.getLogger(LogParser.class);
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/12c4bb11/contrib/src/main/java/com/datatorrent/contrib/parser/LogSchemaDetails.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/parser/LogSchemaDetails.java b/contrib/src/main/java/com/datatorrent/contrib/parser/LogSchemaDetails.java
deleted file mode 100644
index 2cd9cb4..0000000
--- a/contrib/src/main/java/com/datatorrent/contrib/parser/LogSchemaDetails.java
+++ /dev/null
@@ -1,234 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.contrib.parser;
-
-import org.codehaus.jettison.json.JSONArray;
-import org.codehaus.jettison.json.JSONException;
-import org.codehaus.jettison.json.JSONObject;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-/**
- * <p>
- * This is schema that defines fields and their regex
- * The operators use this information to validate the incoming tuples.
- * Information from JSON schema is saved in this object and is used by the
- * operators
- * <p>
- * <br>
- * <br>
- * Example schema <br>
- * <br>
- * {@code{ "fields": [{"field": "host","regex": "^([0-9.]+)"},
- * {"field": "userName","regex": "(.*?)"},
- * {"field": "request","regex": "\"((?:[^\"]|\")+)\""},
- * {"field": "statusCode","regex": "(\\d{3})"},
- * {"field": "bytes","regex": "(\\d+|-)"}]}
- */
-public class LogSchemaDetails
-{
- /**
- * This holds the list of field names in the same order as in the schema
- */
- private List<String> fieldNames = new LinkedList();
-
- private List<Field> fields = new LinkedList();
-
- private Pattern compiledPattern = null;
-
- /**
- * This holds regex pattern for the schema
- */
- private String pattern;
-
- public LogSchemaDetails(String json)
- {
- try {
- initialize(json);
- createPattern();
- this.compiledPattern = Pattern.compile(this.pattern);
- } catch (JSONException | IOException e) {
- logger.error("{}", e);
- throw new IllegalArgumentException(e);
- }
- }
-
- /**
- * For a given json string, this method sets the field members
- * @param json
- * @throws JSONException
- * @throws IOException
- */
- private void initialize(String json) throws JSONException, IOException
- {
- JSONObject jsonObject = new JSONObject(json);
- JSONArray fieldArray = jsonObject.getJSONArray("fields");
-
- for(int i = 0; i < fieldArray.length(); i++) {
- JSONObject obj = fieldArray.getJSONObject(i);
- Field field = new Field(obj.getString("field"), obj.getString("regex"));
- this.fields.add(field);
- this.fieldNames.add(field.name);
- }
- }
-
- /**
- * creates regex group pattern from the regex given for each field
- */
- public void createPattern()
- {
- StringBuffer pattern = new StringBuffer();
- for(Field field: this.getFields()) {
- pattern.append(field.getRegex()).append(" ");
- }
- logger.info("Created pattern for parsing the log {}", pattern.toString().trim());
- this.setPattern(pattern.toString().trim());
- }
-
- /**
- * creates json object by matching the log with given pattern
- * @param log
- * @return logObject
- * @throws Exception
- */
- public JSONObject createJsonFromLog(String log) throws JSONException
- {
- JSONObject logObject = null;
- if (this.compiledPattern != null) {
- Matcher m = this.compiledPattern.matcher(log);
- int count = m.groupCount();
- if (m.find()) {
- int i = 1;
- logObject = new JSONObject();
- for(String field: this.getFieldNames()) {
- if (i > count) {
- break;
- }
- logObject.put(field, m.group(i));
- i++;
- }
- }
- }
- return logObject;
- }
-
- /**
- * Get the list of fieldNames mentioned in schema
- * @return fieldNames
- */
- public List<String> getFieldNames()
- {
- return fieldNames;
- }
-
- /**
- * Get the list of fields (field, regex) mentioned in schema
- * @return fields
- */
- public List<Field> getFields()
- {
- return fields;
- }
-
- /**
- * Get the regex pattern for the schema
- * @return pattern
- */
- public String getPattern()
- {
- return pattern;
- }
-
- /**
- * Set the regex pattern for schema
- * @param pattern
- */
- public void setPattern(String pattern)
- {
- this.pattern = pattern;
- }
-
- public class Field
- {
- /**
- * name of the field
- */
- private String name;
- /**
- * regular expression for the field
- */
- private String regex;
-
-
- public Field(String name, String regex)
- {
- this.name = name;
- this.regex = regex;
- }
-
- /**
- * Get the name of the field
- * @return name
- */
- public String getName()
- {
- return name;
- }
-
- /**
- * Set the name of the field
- * @param name
- */
- public void setName(String name)
- {
- this.name = name;
- }
-
- /**
- * Get the regular expression of the field
- * @return regex
- */
- public String getRegex()
- {
- return regex;
- }
-
- /**
- * Set the regular expression of the field
- * @param regex
- */
- public void setRegex(String regex)
- {
- this.regex = regex;
- }
-
- @Override
- public String toString()
- {
- return "Fields [name=" + name + ", regex=" + regex +"]";
- }
- }
-
- private static final Logger logger = LoggerFactory.getLogger(LogSchemaDetails.class);
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/12c4bb11/contrib/src/main/java/org/apache/apex/malhar/contrib/parser/LogParser.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/parser/LogParser.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/parser/LogParser.java
new file mode 100644
index 0000000..7a4e906
--- /dev/null
+++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/parser/LogParser.java
@@ -0,0 +1,235 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.parser;
+
+import java.io.IOException;
+import javax.validation.constraints.NotNull;
+import org.codehaus.jettison.json.JSONException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.commons.lang3.CharEncoding;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.classification.InterfaceStability;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.datatorrent.api.AutoMetric;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.lib.parser.Parser;
+import com.datatorrent.lib.util.KeyValPair;
+
+/**
+ * Operator that parses a log string tuple against the
+ * a specified json schema and emits POJO on a parsed port and tuples that could not be
+ * parsed on error port.<br>
+ * <b>Properties</b><br>
+ * <b>jsonSchema</b>:schema as a string<br>
+ * <b>clazz</b>:Pojo class in case of user specified schema<br>
+ * <b>Ports</b> <br>
+ * <b>in</b>:input tuple as a String. Each tuple represents a log<br>
+ * <b>parsedOutput</b>:tuples that are validated against the specified schema are emitted
+ * as POJO on this port<br>
+ * <b>err</b>:tuples that do not confine to log format are emitted on this port as
+ * KeyValPair<String,String><br>
+ * Key being the tuple and Val being the reason.
+ */
+@InterfaceStability.Unstable
+public class LogParser extends Parser<byte[], KeyValPair<String, String>>
+{
+ private transient Class<?> clazz;
+
+ @NotNull
+ private String logFileFormat;
+
+ private String encoding;
+
+ private LogSchemaDetails logSchemaDetails;
+
+ private transient ObjectMapper objMapper;
+
+ @Override
+ public Object convert(byte[] tuple)
+ {
+ throw new UnsupportedOperationException("Not supported");
+ }
+
+ @Override
+ public KeyValPair<String, String> processErrorTuple(byte[] bytes)
+ {
+ return null;
+ }
+
+ /**
+ * output port to emit valid records as POJO
+ */
+ public transient DefaultOutputPort<Object> parsedOutput = new DefaultOutputPort<Object>()
+ {
+ public void setup(Context.PortContext context)
+ {
+ clazz = context.getValue(Context.PortContext.TUPLE_CLASS);
+ }
+ };
+
+ /**
+ * metric to keep count of number of tuples emitted on {@link #parsedOutput}
+ * port
+ */
+ @AutoMetric
+ long parsedOutputCount;
+
+ @Override
+ public void beginWindow(long windowId)
+ {
+ super.beginWindow(windowId);
+ parsedOutputCount = 0;
+ }
+
+ @Override
+ public void setup(Context.OperatorContext context)
+ {
+ objMapper = new ObjectMapper();
+ encoding = encoding != null ? encoding : CharEncoding.UTF_8;
+ setupLog();
+ }
+
+ @Override
+ public void processTuple(byte[] inputTuple)
+ {
+ if (inputTuple == null) {
+ this.emitError(null, "null tuple");
+ return;
+ }
+ String incomingString = "";
+ try {
+ incomingString = new String(inputTuple, encoding);
+ if (StringUtils.isBlank(incomingString)) {
+ this.emitError(incomingString, "Blank tuple");
+ return;
+ }
+ logger.debug("Input string {} ", incomingString);
+ logger.debug("Parsing with log format {}", this.geLogFileFormat());
+ if (this.logSchemaDetails != null && clazz != null) {
+ if (parsedOutput.isConnected()) {
+ parsedOutput.emit(objMapper.readValue(this.logSchemaDetails.createJsonFromLog(incomingString).toString().getBytes(), clazz));
+ parsedOutputCount++;
+ }
+ }
+ } catch (NullPointerException | IOException | JSONException e) {
+ this.emitError(incomingString, e.getMessage());
+ logger.error("Failed to parse log tuple {}, Exception = {} ", inputTuple, e);
+ }
+ }
+
+ /**
+ * Emits error on error port
+ * @param tuple
+ * @param errorMsg
+ */
+ public void emitError(String tuple, String errorMsg)
+ {
+ if (err.isConnected()) {
+ err.emit(new KeyValPair<String, String>(tuple, errorMsg));
+ }
+ errorTupleCount++;
+ }
+
+ /**
+ * Setup for the logs according to the logFileFormat
+ */
+ public void setupLog()
+ {
+ try {
+ //parse the schema from logFileFormat string
+ this.logSchemaDetails = new LogSchemaDetails(logFileFormat);
+ } catch (IllegalArgumentException e) {
+ logger.error("Error while initializing the custom log format " + e.getMessage());
+ }
+ }
+
+ /**
+ * Set log file format required for parsing the log
+ * @param logFileFormat
+ */
+ public void setLogFileFormat(String logFileFormat)
+ {
+ this.logFileFormat = logFileFormat;
+ }
+
+ /**
+ * Get log file format required for parsing the log
+ * @return logFileFormat
+ */
+ public String geLogFileFormat()
+ {
+ return logFileFormat;
+ }
+
+ /**
+ * Get encoding parameter for converting tuple into String
+ * @return logSchemaDetails
+ */
+ public String getEncoding()
+ {
+ return encoding;
+ }
+
+ /**
+ * Set encoding parameter for converting tuple into String
+ * @param encoding
+ */
+ public void setEncoding(String encoding)
+ {
+ this.encoding = encoding;
+ }
+
+ /**
+ * Get log schema details (field, regex etc)
+ * @return logSchemaDetails
+ */
+ public LogSchemaDetails getLogSchemaDetails() {
+ return logSchemaDetails;
+ }
+
+ /**
+ * Set log schema details like (fields and regex)
+ * @param logSchemaDetails
+ */
+ public void setLogSchemaDetails(LogSchemaDetails logSchemaDetails) {
+ this.logSchemaDetails = logSchemaDetails;
+ }
+
+ /**
+ * Get the class that needs to be formatted
+ * @return Class<?>
+ */
+ public Class<?> getClazz()
+ {
+ return clazz;
+ }
+
+ /**
+ * Set the class of tuple that needs to be formatted
+ * @param clazz
+ */
+ public void setClazz(Class<?> clazz)
+ {
+ this.clazz = clazz;
+ }
+
+ private static final Logger logger = LoggerFactory.getLogger(LogParser.class);
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/12c4bb11/contrib/src/main/java/org/apache/apex/malhar/contrib/parser/LogSchemaDetails.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/parser/LogSchemaDetails.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/parser/LogSchemaDetails.java
new file mode 100644
index 0000000..8120e00
--- /dev/null
+++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/parser/LogSchemaDetails.java
@@ -0,0 +1,234 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.parser;
+
+import org.codehaus.jettison.json.JSONArray;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * <p>
+ * This is schema that defines fields and their regex
+ * The operators use this information to validate the incoming tuples.
+ * Information from JSON schema is saved in this object and is used by the
+ * operators
+ * <p>
+ * <br>
+ * <br>
+ * Example schema <br>
+ * <br>
+ * {@code{ "fields": [{"field": "host","regex": "^([0-9.]+)"},
+ * {"field": "userName","regex": "(.*?)"},
+ * {"field": "request","regex": "\"((?:[^\"]|\")+)\""},
+ * {"field": "statusCode","regex": "(\\d{3})"},
+ * {"field": "bytes","regex": "(\\d+|-)"}]}
+ */
+public class LogSchemaDetails
+{
+ /**
+ * This holds the list of field names in the same order as in the schema
+ */
+ private List<String> fieldNames = new LinkedList();
+
+ private List<Field> fields = new LinkedList();
+
+ private Pattern compiledPattern = null;
+
+ /**
+ * This holds regex pattern for the schema
+ */
+ private String pattern;
+
+ public LogSchemaDetails(String json)
+ {
+ try {
+ initialize(json);
+ createPattern();
+ this.compiledPattern = Pattern.compile(this.pattern);
+ } catch (JSONException | IOException e) {
+ logger.error("{}", e);
+ throw new IllegalArgumentException(e);
+ }
+ }
+
+ /**
+ * For a given json string, this method sets the field members
+ * @param json
+ * @throws JSONException
+ * @throws IOException
+ */
+ private void initialize(String json) throws JSONException, IOException
+ {
+ JSONObject jsonObject = new JSONObject(json);
+ JSONArray fieldArray = jsonObject.getJSONArray("fields");
+
+ for(int i = 0; i < fieldArray.length(); i++) {
+ JSONObject obj = fieldArray.getJSONObject(i);
+ Field field = new Field(obj.getString("field"), obj.getString("regex"));
+ this.fields.add(field);
+ this.fieldNames.add(field.name);
+ }
+ }
+
+ /**
+ * creates regex group pattern from the regex given for each field
+ */
+ public void createPattern()
+ {
+ StringBuffer pattern = new StringBuffer();
+ for(Field field: this.getFields()) {
+ pattern.append(field.getRegex()).append(" ");
+ }
+ logger.info("Created pattern for parsing the log {}", pattern.toString().trim());
+ this.setPattern(pattern.toString().trim());
+ }
+
+ /**
+ * creates json object by matching the log with given pattern
+ * @param log
+ * @return logObject
+ * @throws Exception
+ */
+ public JSONObject createJsonFromLog(String log) throws JSONException
+ {
+ JSONObject logObject = null;
+ if (this.compiledPattern != null) {
+ Matcher m = this.compiledPattern.matcher(log);
+ int count = m.groupCount();
+ if (m.find()) {
+ int i = 1;
+ logObject = new JSONObject();
+ for(String field: this.getFieldNames()) {
+ if (i > count) {
+ break;
+ }
+ logObject.put(field, m.group(i));
+ i++;
+ }
+ }
+ }
+ return logObject;
+ }
+
+ /**
+ * Get the list of fieldNames mentioned in schema
+ * @return fieldNames
+ */
+ public List<String> getFieldNames()
+ {
+ return fieldNames;
+ }
+
+ /**
+ * Get the list of fields (field, regex) mentioned in schema
+ * @return fields
+ */
+ public List<Field> getFields()
+ {
+ return fields;
+ }
+
+ /**
+ * Get the regex pattern for the schema
+ * @return pattern
+ */
+ public String getPattern()
+ {
+ return pattern;
+ }
+
+ /**
+ * Set the regex pattern for schema
+ * @param pattern
+ */
+ public void setPattern(String pattern)
+ {
+ this.pattern = pattern;
+ }
+
+ public class Field
+ {
+ /**
+ * name of the field
+ */
+ private String name;
+ /**
+ * regular expression for the field
+ */
+ private String regex;
+
+
+ public Field(String name, String regex)
+ {
+ this.name = name;
+ this.regex = regex;
+ }
+
+ /**
+ * Get the name of the field
+ * @return name
+ */
+ public String getName()
+ {
+ return name;
+ }
+
+ /**
+ * Set the name of the field
+ * @param name
+ */
+ public void setName(String name)
+ {
+ this.name = name;
+ }
+
+ /**
+ * Get the regular expression of the field
+ * @return regex
+ */
+ public String getRegex()
+ {
+ return regex;
+ }
+
+ /**
+ * Set the regular expression of the field
+ * @param regex
+ */
+ public void setRegex(String regex)
+ {
+ this.regex = regex;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "Fields [name=" + name + ", regex=" + regex +"]";
+ }
+ }
+
+ private static final Logger logger = LoggerFactory.getLogger(LogSchemaDetails.class);
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/12c4bb11/contrib/src/test/java/com/datatorrent/contrib/parser/LogParserTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/parser/LogParserTest.java b/contrib/src/test/java/com/datatorrent/contrib/parser/LogParserTest.java
deleted file mode 100644
index 53ae033..0000000
--- a/contrib/src/test/java/com/datatorrent/contrib/parser/LogParserTest.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.contrib.parser;
-
-import org.codehaus.jettison.json.JSONException;
-import org.jooq.exception.IOException;
-import org.junit.Assert;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TestWatcher;
-import org.junit.runner.Description;
-
-import com.datatorrent.lib.appdata.schemas.SchemaUtils;
-import com.datatorrent.lib.testbench.CollectorTestSink;
-
-public class LogParserTest
-{
- private String filename = "logSchema.json";
-
- LogParser logParser = new LogParser();
-
- private CollectorTestSink<Object> error = new CollectorTestSink<Object>();
-
- private CollectorTestSink<Object> pojoPort = new CollectorTestSink<Object>();
-
- @Rule
- public Watcher watcher = new Watcher();
-
- public class Watcher extends TestWatcher
- {
- @Override
- protected void starting(Description description)
- {
- super.starting(description);
- logParser.err.setSink(error);
- logParser.parsedOutput.setSink(pojoPort);
- }
-
- @Override
- protected void finished(Description description)
- {
- super.finished(description);
- error.clear();
- pojoPort.clear();
- logParser.teardown();
- }
- }
-
- @Test
- public void TestEmptyInput()
- {
- String tuple = "";
- logParser.beginWindow(0);
- logParser.in.process(tuple.getBytes());
- logParser.endWindow();
- Assert.assertEquals(0, pojoPort.collectedTuples.size());
- Assert.assertEquals(1, error.collectedTuples.size());
- }
-
- @Test
- public void TestNullInput()
- {
- logParser.beginWindow(0);
- logParser.in.process(null);
- logParser.endWindow();
- Assert.assertEquals(0, pojoPort.collectedTuples.size());
- Assert.assertEquals(1, error.collectedTuples.size());
- }
-
- @Test
- public void TestSchemaInput() throws JSONException, java.io.IOException
- {
- logParser.setLogFileFormat(SchemaUtils.jarResourceFileToString(filename));
- logParser.setup(null);
- logParser.setClazz(LogSchema.class);
- logParser.setLogSchemaDetails(new LogSchemaDetails(logParser.geLogFileFormat()));
- String log = "125.125.125.125 smith 200 1043";
- logParser.beginWindow(0);
- logParser.in.process(log.getBytes());
- logParser.endWindow();
- Assert.assertEquals(1, pojoPort.collectedTuples.size());
- Assert.assertEquals(0, error.collectedTuples.size());
- Object obj = pojoPort.collectedTuples.get(0);
- Assert.assertNotNull(obj);
- LogSchema pojo = (LogSchema) obj;
- Assert.assertEquals("125.125.125.125", pojo.getHost());
- Assert.assertEquals("smith", pojo.getUserName());
- Assert.assertEquals("200", pojo.getStatusCode());
- Assert.assertEquals("1043", pojo.getBytes());
- }
-
- @Test
- public void TestInvalidSchemaInput() throws JSONException, IOException
- {
- logParser.setLogFileFormat(SchemaUtils.jarResourceFileToString("invalidLogSchema.json"));
- logParser.setup(null);
- logParser.setClazz(LogSchema.class);
- logParser.setLogSchemaDetails(new LogSchemaDetails(logParser.geLogFileFormat()));
- String log = "125.125.125.125 smith 200 1043";
- logParser.beginWindow(0);
- logParser.in.process(log.getBytes());
- logParser.endWindow();
- Assert.assertEquals(0, pojoPort.collectedTuples.size());
- Assert.assertEquals(1, error.collectedTuples.size());
- }
-
- public static class LogSchema {
- private String host;
- private String userName;
- private String statusCode;
- private String bytes;
-
- public String getHost() {
- return host;
- }
-
- public void setHost(String host) {
- this.host = host;
- }
-
- public String getUserName() {
- return userName;
- }
-
- public void setUserName(String username) {
- this.userName = username;
- }
-
- public String getStatusCode() {
- return statusCode;
- }
-
- public void setStatusCode(String statusCode) {
- this.statusCode = statusCode;
- }
-
- public String getBytes() {
- return bytes;
- }
-
- public void setBytes(String bytes) {
- this.bytes = bytes;
- }
-
- @Override
- public String toString()
- {
- return "LogSchema [host=" + host + ", userName=" + userName
- + ", statusCode=" + statusCode + ", bytes=" + bytes + "]";
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/12c4bb11/contrib/src/test/java/org/apache/apex/malhar/contrib/parser/LogParserTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/org/apache/apex/malhar/contrib/parser/LogParserTest.java b/contrib/src/test/java/org/apache/apex/malhar/contrib/parser/LogParserTest.java
new file mode 100644
index 0000000..996502d
--- /dev/null
+++ b/contrib/src/test/java/org/apache/apex/malhar/contrib/parser/LogParserTest.java
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.parser;
+
+import org.codehaus.jettison.json.JSONException;
+import org.jooq.exception.IOException;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
+
+import org.apache.apex.malhar.contrib.parser.LogParser;
+import org.apache.apex.malhar.contrib.parser.LogSchemaDetails;
+
+import com.datatorrent.lib.appdata.schemas.SchemaUtils;
+import com.datatorrent.lib.testbench.CollectorTestSink;
+
+public class LogParserTest
+{
+ private String filename = "logSchema.json";
+
+ LogParser logParser = new LogParser();
+
+ private CollectorTestSink<Object> error = new CollectorTestSink<Object>();
+
+ private CollectorTestSink<Object> pojoPort = new CollectorTestSink<Object>();
+
+ @Rule
+ public Watcher watcher = new Watcher();
+
+ public class Watcher extends TestWatcher
+ {
+ @Override
+ protected void starting(Description description)
+ {
+ super.starting(description);
+ logParser.err.setSink(error);
+ logParser.parsedOutput.setSink(pojoPort);
+ }
+
+ @Override
+ protected void finished(Description description)
+ {
+ super.finished(description);
+ error.clear();
+ pojoPort.clear();
+ logParser.teardown();
+ }
+ }
+
+ @Test
+ public void TestEmptyInput()
+ {
+ String tuple = "";
+ logParser.beginWindow(0);
+ logParser.in.process(tuple.getBytes());
+ logParser.endWindow();
+ Assert.assertEquals(0, pojoPort.collectedTuples.size());
+ Assert.assertEquals(1, error.collectedTuples.size());
+ }
+
+ @Test
+ public void TestNullInput()
+ {
+ logParser.beginWindow(0);
+ logParser.in.process(null);
+ logParser.endWindow();
+ Assert.assertEquals(0, pojoPort.collectedTuples.size());
+ Assert.assertEquals(1, error.collectedTuples.size());
+ }
+
+ @Test
+ public void TestSchemaInput() throws JSONException, java.io.IOException
+ {
+ logParser.setLogFileFormat(SchemaUtils.jarResourceFileToString(filename));
+ logParser.setup(null);
+ logParser.setClazz(LogSchema.class);
+ logParser.setLogSchemaDetails(new LogSchemaDetails(logParser.geLogFileFormat()));
+ String log = "125.125.125.125 smith 200 1043";
+ logParser.beginWindow(0);
+ logParser.in.process(log.getBytes());
+ logParser.endWindow();
+ Assert.assertEquals(1, pojoPort.collectedTuples.size());
+ Assert.assertEquals(0, error.collectedTuples.size());
+ Object obj = pojoPort.collectedTuples.get(0);
+ Assert.assertNotNull(obj);
+ LogSchema pojo = (LogSchema) obj;
+ Assert.assertEquals("125.125.125.125", pojo.getHost());
+ Assert.assertEquals("smith", pojo.getUserName());
+ Assert.assertEquals("200", pojo.getStatusCode());
+ Assert.assertEquals("1043", pojo.getBytes());
+ }
+
+ @Test
+ public void TestInvalidSchemaInput() throws JSONException, IOException
+ {
+ logParser.setLogFileFormat(SchemaUtils.jarResourceFileToString("invalidLogSchema.json"));
+ logParser.setup(null);
+ logParser.setClazz(LogSchema.class);
+ logParser.setLogSchemaDetails(new LogSchemaDetails(logParser.geLogFileFormat()));
+ String log = "125.125.125.125 smith 200 1043";
+ logParser.beginWindow(0);
+ logParser.in.process(log.getBytes());
+ logParser.endWindow();
+ Assert.assertEquals(0, pojoPort.collectedTuples.size());
+ Assert.assertEquals(1, error.collectedTuples.size());
+ }
+
+ public static class LogSchema {
+ private String host;
+ private String userName;
+ private String statusCode;
+ private String bytes;
+
+ public String getHost() {
+ return host;
+ }
+
+ public void setHost(String host) {
+ this.host = host;
+ }
+
+ public String getUserName() {
+ return userName;
+ }
+
+ public void setUserName(String username) {
+ this.userName = username;
+ }
+
+ public String getStatusCode() {
+ return statusCode;
+ }
+
+ public void setStatusCode(String statusCode) {
+ this.statusCode = statusCode;
+ }
+
+ public String getBytes() {
+ return bytes;
+ }
+
+ public void setBytes(String bytes) {
+ this.bytes = bytes;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "LogSchema [host=" + host + ", userName=" + userName
+ + ", statusCode=" + statusCode + ", bytes=" + bytes + "]";
+ }
+ }
+}
[2/2] apex-malhar git commit: Merge branch
'APEXMALHAR-2377-LogParser-package' of
https://github.com/jogshraddha/apex-malhar
Posted by tu...@apache.org.
Merge branch 'APEXMALHAR-2377-LogParser-package' of https://github.com/jogshraddha/apex-malhar
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/52510b0f
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/52510b0f
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/52510b0f
Branch: refs/heads/master
Commit: 52510b0f8fbe60e183ee2c59780a2244a3c9121d
Parents: ca6995c 12c4bb1
Author: Tushar R. Gosavi <tu...@apache.org>
Authored: Fri Jan 13 15:45:54 2017 +0530
Committer: Tushar R. Gosavi <tu...@apache.org>
Committed: Fri Jan 13 15:45:54 2017 +0530
----------------------------------------------------------------------
.../datatorrent/contrib/parser/LogParser.java | 235 -------------------
.../contrib/parser/LogSchemaDetails.java | 234 ------------------
.../apex/malhar/contrib/parser/LogParser.java | 235 +++++++++++++++++++
.../malhar/contrib/parser/LogSchemaDetails.java | 234 ++++++++++++++++++
.../contrib/parser/LogParserTest.java | 168 -------------
.../malhar/contrib/parser/LogParserTest.java | 171 ++++++++++++++
6 files changed, 640 insertions(+), 637 deletions(-)
----------------------------------------------------------------------