You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2017/12/01 17:59:56 UTC

[apex-malhar] branch master updated: APEXMALHAR-2550 Made NycTaxiDataReader and NycTaxiCsvParser more resilient to data with bad format

This is an automated email from the ASF dual-hosted git repository.

thw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apex-malhar.git


The following commit(s) were added to refs/heads/master by this push:
     new aa5c7c4  APEXMALHAR-2550 Made NycTaxiDataReader and NycTaxiCsvParser more resilient to data with bad format
aa5c7c4 is described below

commit aa5c7c42f86966d2954dfb988a85099ce2c9b5c8
Author: David Yan <da...@apache.org>
AuthorDate: Tue Nov 28 20:32:46 2017 -0800

    APEXMALHAR-2550 Made NycTaxiDataReader and NycTaxiCsvParser more resilient to data with bad format
---
 .../apache/apex/examples/nyctaxi/NycTaxiCsvParser.java | 10 ++++++++--
 .../apex/examples/nyctaxi/NycTaxiDataReader.java       | 18 ++++++++++--------
 2 files changed, 18 insertions(+), 10 deletions(-)

diff --git a/examples/nyctaxi/src/main/java/org/apache/apex/examples/nyctaxi/NycTaxiCsvParser.java b/examples/nyctaxi/src/main/java/org/apache/apex/examples/nyctaxi/NycTaxiCsvParser.java
index 3e13e76..7ecf62a 100644
--- a/examples/nyctaxi/src/main/java/org/apache/apex/examples/nyctaxi/NycTaxiCsvParser.java
+++ b/examples/nyctaxi/src/main/java/org/apache/apex/examples/nyctaxi/NycTaxiCsvParser.java
@@ -21,6 +21,9 @@ package org.apache.apex.examples.nyctaxi;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.commons.lang3.StringUtils;
 
 import com.datatorrent.api.DefaultInputPort;
@@ -40,17 +43,20 @@ public class NycTaxiCsvParser extends BaseOperator
     @Override
     public void process(String tuple)
     {
-      String[] values = tuple.split(",");
+      String[] values = tuple.split(",", -1);
       Map<String, String> outputTuple = new HashMap<>();
-      if (StringUtils.isNumeric(values[0])) {
+      if (values.length > 18 && StringUtils.isNumeric(values[0])) {
         outputTuple.put("pickup_time", values[1]);
         outputTuple.put("pickup_lon", values[5]);
         outputTuple.put("pickup_lat", values[6]);
         outputTuple.put("total_fare", values[18]);
         output.emit(outputTuple);
+      } else {
+        LOG.warn("Dropping tuple with unrecognized format: {}", tuple);
       }
     }
   };
 
   public final transient DefaultOutputPort<Map<String, String>> output = new DefaultOutputPort<>();
+  private static final Logger LOG = LoggerFactory.getLogger(NycTaxiCsvParser.class);
 }
diff --git a/examples/nyctaxi/src/main/java/org/apache/apex/examples/nyctaxi/NycTaxiDataReader.java b/examples/nyctaxi/src/main/java/org/apache/apex/examples/nyctaxi/NycTaxiDataReader.java
index b2168f4..1d4114c 100644
--- a/examples/nyctaxi/src/main/java/org/apache/apex/examples/nyctaxi/NycTaxiDataReader.java
+++ b/examples/nyctaxi/src/main/java/org/apache/apex/examples/nyctaxi/NycTaxiDataReader.java
@@ -54,14 +54,16 @@ public class NycTaxiDataReader extends LineByLineFileInputOperator
   protected String readEntity() throws IOException
   {
     String line = super.readEntity();
-    String[] fields = line.split(",");
-    String timestamp = fields[1];
-    if (currentTimestamp == null) {
-      currentTimestamp = timestamp;
-    } else if (timestamp != currentTimestamp) {
-      // suspend emit until the next streaming window when timestamp is different from the current timestamp.
-      suspendEmit = true;
-      currentTimestamp = timestamp;
+    String[] fields = line.split(",", -1);
+    if (fields.length > 1) {
+      String timestamp = fields[1];
+      if (currentTimestamp == null) {
+        currentTimestamp = timestamp;
+      } else if (timestamp != currentTimestamp) {
+        // suspend emit until the next streaming window when timestamp is different from the current timestamp.
+        suspendEmit = true;
+        currentTimestamp = timestamp;
+      }
     }
     return line;
   }

-- 
To stop receiving notification emails like this one, please contact
['"commits@apex.apache.org" <co...@apex.apache.org>'].