You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2017/12/01 17:59:56 UTC
[apex-malhar] branch master updated: APEXMALHAR-2550 Made
NycTaxiDataReader and NycTaxiCsvParser more resilient to data with bad
format
This is an automated email from the ASF dual-hosted git repository.
thw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apex-malhar.git
The following commit(s) were added to refs/heads/master by this push:
new aa5c7c4 APEXMALHAR-2550 Made NycTaxiDataReader and NycTaxiCsvParser more resilient to data with bad format
aa5c7c4 is described below
commit aa5c7c42f86966d2954dfb988a85099ce2c9b5c8
Author: David Yan <da...@apache.org>
AuthorDate: Tue Nov 28 20:32:46 2017 -0800
APEXMALHAR-2550 Made NycTaxiDataReader and NycTaxiCsvParser more resilient to data with bad format
---
.../apache/apex/examples/nyctaxi/NycTaxiCsvParser.java | 10 ++++++++--
.../apex/examples/nyctaxi/NycTaxiDataReader.java | 18 ++++++++++--------
2 files changed, 18 insertions(+), 10 deletions(-)
diff --git a/examples/nyctaxi/src/main/java/org/apache/apex/examples/nyctaxi/NycTaxiCsvParser.java b/examples/nyctaxi/src/main/java/org/apache/apex/examples/nyctaxi/NycTaxiCsvParser.java
index 3e13e76..7ecf62a 100644
--- a/examples/nyctaxi/src/main/java/org/apache/apex/examples/nyctaxi/NycTaxiCsvParser.java
+++ b/examples/nyctaxi/src/main/java/org/apache/apex/examples/nyctaxi/NycTaxiCsvParser.java
@@ -21,6 +21,9 @@ package org.apache.apex.examples.nyctaxi;
import java.util.HashMap;
import java.util.Map;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.commons.lang3.StringUtils;
import com.datatorrent.api.DefaultInputPort;
@@ -40,17 +43,20 @@ public class NycTaxiCsvParser extends BaseOperator
@Override
public void process(String tuple)
{
- String[] values = tuple.split(",");
+ String[] values = tuple.split(",", -1);
Map<String, String> outputTuple = new HashMap<>();
- if (StringUtils.isNumeric(values[0])) {
+ if (values.length > 18 && StringUtils.isNumeric(values[0])) {
outputTuple.put("pickup_time", values[1]);
outputTuple.put("pickup_lon", values[5]);
outputTuple.put("pickup_lat", values[6]);
outputTuple.put("total_fare", values[18]);
output.emit(outputTuple);
+ } else {
+ LOG.warn("Dropping tuple with unrecognized format: {}", tuple);
}
}
};
public final transient DefaultOutputPort<Map<String, String>> output = new DefaultOutputPort<>();
+ private static final Logger LOG = LoggerFactory.getLogger(NycTaxiCsvParser.class);
}
diff --git a/examples/nyctaxi/src/main/java/org/apache/apex/examples/nyctaxi/NycTaxiDataReader.java b/examples/nyctaxi/src/main/java/org/apache/apex/examples/nyctaxi/NycTaxiDataReader.java
index b2168f4..1d4114c 100644
--- a/examples/nyctaxi/src/main/java/org/apache/apex/examples/nyctaxi/NycTaxiDataReader.java
+++ b/examples/nyctaxi/src/main/java/org/apache/apex/examples/nyctaxi/NycTaxiDataReader.java
@@ -54,14 +54,16 @@ public class NycTaxiDataReader extends LineByLineFileInputOperator
protected String readEntity() throws IOException
{
String line = super.readEntity();
- String[] fields = line.split(",");
- String timestamp = fields[1];
- if (currentTimestamp == null) {
- currentTimestamp = timestamp;
- } else if (timestamp != currentTimestamp) {
- // suspend emit until the next streaming window when timestamp is different from the current timestamp.
- suspendEmit = true;
- currentTimestamp = timestamp;
+ String[] fields = line.split(",", -1);
+ if (fields.length > 1) {
+ String timestamp = fields[1];
+ if (currentTimestamp == null) {
+ currentTimestamp = timestamp;
+ } else if (timestamp != currentTimestamp) {
+ // suspend emit until the next streaming window when timestamp is different from the current timestamp.
+ suspendEmit = true;
+ currentTimestamp = timestamp;
+ }
}
return line;
}
--
To stop receiving notification emails like this one, please contact
['"commits@apex.apache.org" <co...@apex.apache.org>'].