You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by rg...@apache.org on 2022/08/13 17:51:26 UTC

[flume] branch flume-1.10.1 updated: FLUME-3434: TwitterSource fails to cast int to long Adds in counters and improved testing

This is an automated email from the ASF dual-hosted git repository.

rgoers pushed a commit to branch flume-1.10.1
in repository https://gitbox.apache.org/repos/asf/flume.git


The following commit(s) were added to refs/heads/flume-1.10.1 by this push:
     new 68ba6686 FLUME-3434: TwitterSource fails to cast int to long Adds in counters and improved testing
68ba6686 is described below

commit 68ba66866d376be3c040379b22ddaf451be9c648
Author: tmgstevens <tr...@apache.org>
AuthorDate: Tue Aug 9 15:05:58 2022 +0100

    FLUME-3434: TwitterSource fails to cast int to long
    Adds in counters and improved testing
---
 flume-ng-sources/flume-twitter-source/pom.xml       |  6 ++++++
 .../apache/flume/source/twitter/TwitterSource.java  | 21 +++++++++++++++++----
 .../flume/source/twitter/TestTwitterSource.java     | 21 +++++++++++++++++++++
 3 files changed, 44 insertions(+), 4 deletions(-)

diff --git a/flume-ng-sources/flume-twitter-source/pom.xml b/flume-ng-sources/flume-twitter-source/pom.xml
index 948fcf88..f21a5479 100644
--- a/flume-ng-sources/flume-twitter-source/pom.xml
+++ b/flume-ng-sources/flume-twitter-source/pom.xml
@@ -49,6 +49,12 @@ limitations under the License.
       <scope>test</scope>
     </dependency>
 
+    <dependency>
+      <groupId>org.easytesting</groupId>
+      <artifactId>fest-reflect</artifactId>
+      <scope>test</scope>
+    </dependency>
+
     <dependency>
       <groupId>org.twitter4j</groupId>
       <artifactId>twitter4j-core</artifactId>
diff --git a/flume-ng-sources/flume-twitter-source/src/main/java/org/apache/flume/source/twitter/TwitterSource.java b/flume-ng-sources/flume-twitter-source/src/main/java/org/apache/flume/source/twitter/TwitterSource.java
index 04314511..04849bb7 100644
--- a/flume-ng-sources/flume-twitter-source/src/main/java/org/apache/flume/source/twitter/TwitterSource.java
+++ b/flume-ng-sources/flume-twitter-source/src/main/java/org/apache/flume/source/twitter/TwitterSource.java
@@ -41,6 +41,7 @@ import org.apache.flume.annotations.InterfaceStability;
 import org.apache.flume.conf.BatchSizeSupported;
 import org.apache.flume.conf.Configurable;
 import org.apache.flume.event.EventBuilder;
+import org.apache.flume.instrumentation.SourceCounter;
 import org.apache.flume.source.AbstractSource;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -87,6 +88,8 @@ public class TwitterSource
   private int maxBatchSize = 1000;
   private int maxBatchDurationMillis = 1000;
 
+  private SourceCounter sourceCounter;
+
   // Fri May 14 02:52:55 +0000 2010
   private SimpleDateFormat formatterTo =
       new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'");
@@ -119,6 +122,10 @@ public class TwitterSource
     maxBatchSize = context.getInteger("maxBatchSize", maxBatchSize);
     maxBatchDurationMillis = context.getInteger("maxBatchDurationMillis",
                                                 maxBatchDurationMillis);
+
+    if (sourceCounter == null) {
+      sourceCounter = new SourceCounter(getName());
+    }
   }
 
   @Override
@@ -156,17 +163,20 @@ public class TwitterSource
     docs.add(doc);
     if (docs.size() >= maxBatchSize ||
         System.currentTimeMillis() >= batchEndTime) {
+      sourceCounter.addToEventReceivedCount(docs.size());
       batchEndTime = System.currentTimeMillis() + maxBatchDurationMillis;
       byte[] bytes;
       try {
         bytes = serializeToAvro(avroSchema, docs);
       } catch (IOException e) {
+        sourceCounter.incrementGenericProcessingFail();
         LOGGER.error("Exception while serializing tweet", e);
         return; //skip
       }
       Event event = EventBuilder.withBody(bytes);
       getChannelProcessor().processEvent(event); // send event to the flume sink
       docs.clear();
+      sourceCounter.addToEventAcceptedCount(docs.size());
     }
     docCount++;
     if ((docCount % REPORT_INTERVAL) == 0) {
@@ -240,7 +250,7 @@ public class TwitterSource
 
     doc.put("id", idPrefix + status.getId());
     doc.put("created_at", formatterTo.format(status.getCreatedAt()));
-    doc.put("retweet_count", status.getRetweetCount());
+    doc.put("retweet_count", Long.valueOf(status.getRetweetCount()));
     doc.put("retweeted", status.isRetweet());
     doc.put("in_reply_to_user_id", status.getInReplyToUserId());
     doc.put("in_reply_to_status_id", status.getInReplyToStatusId());
@@ -268,10 +278,13 @@ public class TwitterSource
       throws IOException {
     serializationBuffer.reset();
     dataFileWriter.create(avroSchema, serializationBuffer);
-    for (Record doc2 : docList) {
-      dataFileWriter.append(doc2);
+    try {
+      for (Record doc2 : docList) {
+        dataFileWriter.append(doc2);
+      }
+    } finally {
+      dataFileWriter.close();
     }
-    dataFileWriter.close();
     return serializationBuffer.toByteArray();
   }
 
diff --git a/flume-ng-sources/flume-twitter-source/src/test/java/org/apache/flume/source/twitter/TestTwitterSource.java b/flume-ng-sources/flume-twitter-source/src/test/java/org/apache/flume/source/twitter/TestTwitterSource.java
index b2636ee2..034c2e3a 100644
--- a/flume-ng-sources/flume-twitter-source/src/test/java/org/apache/flume/source/twitter/TestTwitterSource.java
+++ b/flume-ng-sources/flume-twitter-source/src/test/java/org/apache/flume/source/twitter/TestTwitterSource.java
@@ -18,6 +18,7 @@
  */
 package org.apache.flume.source.twitter;
 
+import static org.fest.reflect.core.Reflection.field;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.text.SimpleDateFormat;
@@ -34,6 +35,8 @@ import org.apache.flume.channel.ChannelProcessor;
 import org.apache.flume.channel.MemoryChannel;
 import org.apache.flume.channel.ReplicatingChannelSelector;
 import org.apache.flume.conf.Configurables;
+import org.apache.flume.instrumentation.ChannelCounter;
+import org.apache.flume.instrumentation.SourceCounter;
 import org.apache.flume.sink.DefaultSinkProcessor;
 import org.apache.flume.sink.LoggerSink;
 import org.junit.Assert;
@@ -72,6 +75,7 @@ public class TestTwitterSource extends Assert {
     context.put("accessToken", accessToken);
     context.put("accessTokenSecret", accessTokenSecret);
     context.put("maxBatchDurationMillis", "1000");
+    context.put("maxBatchSize", "1");
 
     TwitterSource source = new TwitterSource();
     source.configure(context);
@@ -100,6 +104,23 @@ public class TestTwitterSource extends Assert {
     source.stop();
     sinkRunner.stop();
     sink.stop();
+
+    long successfulEvents = getTwitterCounterGroup(source).getEventReceivedCount();
+    long receivedEvents = getTwitterCounterGroup(source).getEventReceivedCount();
+    long channelEvents = getMemoryChannelCounterGroup((MemoryChannel)channel).getEventPutAttemptCount();
+
+    assertEquals("Received vs. Success:", receivedEvents, successfulEvents);
+    assertEquals("Success vs. Channel", channelEvents, successfulEvents);
+
+  }
+
+  private SourceCounter getTwitterCounterGroup(TwitterSource source) {
+    return field("sourceCounter").ofType(SourceCounter.class).in(source).get();
+  }
+
+
+  private ChannelCounter getMemoryChannelCounterGroup(MemoryChannel source) {
+    return field("channelCounter").ofType(ChannelCounter.class).in(source).get();
   }
 
   @Test