You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by rg...@apache.org on 2022/08/13 17:51:26 UTC
[flume] branch flume-1.10.1 updated: FLUME-3434: TwitterSource fails to cast int to long Adds in counters and improved testing
This is an automated email from the ASF dual-hosted git repository.
rgoers pushed a commit to branch flume-1.10.1
in repository https://gitbox.apache.org/repos/asf/flume.git
The following commit(s) were added to refs/heads/flume-1.10.1 by this push:
new 68ba6686 FLUME-3434: TwitterSource fails to cast int to long Adds in counters and improved testing
68ba6686 is described below
commit 68ba66866d376be3c040379b22ddaf451be9c648
Author: tmgstevens <tr...@apache.org>
AuthorDate: Tue Aug 9 15:05:58 2022 +0100
FLUME-3434: TwitterSource fails to cast int to long
Adds in counters and improved testing
---
flume-ng-sources/flume-twitter-source/pom.xml | 6 ++++++
.../apache/flume/source/twitter/TwitterSource.java | 21 +++++++++++++++++----
.../flume/source/twitter/TestTwitterSource.java | 21 +++++++++++++++++++++
3 files changed, 44 insertions(+), 4 deletions(-)
diff --git a/flume-ng-sources/flume-twitter-source/pom.xml b/flume-ng-sources/flume-twitter-source/pom.xml
index 948fcf88..f21a5479 100644
--- a/flume-ng-sources/flume-twitter-source/pom.xml
+++ b/flume-ng-sources/flume-twitter-source/pom.xml
@@ -49,6 +49,12 @@ limitations under the License.
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.easytesting</groupId>
+ <artifactId>fest-reflect</artifactId>
+ <scope>test</scope>
+ </dependency>
+
<dependency>
<groupId>org.twitter4j</groupId>
<artifactId>twitter4j-core</artifactId>
diff --git a/flume-ng-sources/flume-twitter-source/src/main/java/org/apache/flume/source/twitter/TwitterSource.java b/flume-ng-sources/flume-twitter-source/src/main/java/org/apache/flume/source/twitter/TwitterSource.java
index 04314511..04849bb7 100644
--- a/flume-ng-sources/flume-twitter-source/src/main/java/org/apache/flume/source/twitter/TwitterSource.java
+++ b/flume-ng-sources/flume-twitter-source/src/main/java/org/apache/flume/source/twitter/TwitterSource.java
@@ -41,6 +41,7 @@ import org.apache.flume.annotations.InterfaceStability;
import org.apache.flume.conf.BatchSizeSupported;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.EventBuilder;
+import org.apache.flume.instrumentation.SourceCounter;
import org.apache.flume.source.AbstractSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -87,6 +88,8 @@ public class TwitterSource
private int maxBatchSize = 1000;
private int maxBatchDurationMillis = 1000;
+ private SourceCounter sourceCounter;
+
// Fri May 14 02:52:55 +0000 2010
private SimpleDateFormat formatterTo =
new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'");
@@ -119,6 +122,10 @@ public class TwitterSource
maxBatchSize = context.getInteger("maxBatchSize", maxBatchSize);
maxBatchDurationMillis = context.getInteger("maxBatchDurationMillis",
maxBatchDurationMillis);
+
+ if (sourceCounter == null) {
+ sourceCounter = new SourceCounter(getName());
+ }
}
@Override
@@ -156,17 +163,20 @@ public class TwitterSource
docs.add(doc);
if (docs.size() >= maxBatchSize ||
System.currentTimeMillis() >= batchEndTime) {
+ sourceCounter.addToEventReceivedCount(docs.size());
batchEndTime = System.currentTimeMillis() + maxBatchDurationMillis;
byte[] bytes;
try {
bytes = serializeToAvro(avroSchema, docs);
} catch (IOException e) {
+ sourceCounter.incrementGenericProcessingFail();
LOGGER.error("Exception while serializing tweet", e);
return; //skip
}
Event event = EventBuilder.withBody(bytes);
getChannelProcessor().processEvent(event); // send event to the flume sink
docs.clear();
+ sourceCounter.addToEventAcceptedCount(docs.size());
}
docCount++;
if ((docCount % REPORT_INTERVAL) == 0) {
@@ -240,7 +250,7 @@ public class TwitterSource
doc.put("id", idPrefix + status.getId());
doc.put("created_at", formatterTo.format(status.getCreatedAt()));
- doc.put("retweet_count", status.getRetweetCount());
+ doc.put("retweet_count", Long.valueOf(status.getRetweetCount()));
doc.put("retweeted", status.isRetweet());
doc.put("in_reply_to_user_id", status.getInReplyToUserId());
doc.put("in_reply_to_status_id", status.getInReplyToStatusId());
@@ -268,10 +278,13 @@ public class TwitterSource
throws IOException {
serializationBuffer.reset();
dataFileWriter.create(avroSchema, serializationBuffer);
- for (Record doc2 : docList) {
- dataFileWriter.append(doc2);
+ try {
+ for (Record doc2 : docList) {
+ dataFileWriter.append(doc2);
+ }
+ } finally {
+ dataFileWriter.close();
}
- dataFileWriter.close();
return serializationBuffer.toByteArray();
}
diff --git a/flume-ng-sources/flume-twitter-source/src/test/java/org/apache/flume/source/twitter/TestTwitterSource.java b/flume-ng-sources/flume-twitter-source/src/test/java/org/apache/flume/source/twitter/TestTwitterSource.java
index b2636ee2..034c2e3a 100644
--- a/flume-ng-sources/flume-twitter-source/src/test/java/org/apache/flume/source/twitter/TestTwitterSource.java
+++ b/flume-ng-sources/flume-twitter-source/src/test/java/org/apache/flume/source/twitter/TestTwitterSource.java
@@ -18,6 +18,7 @@
*/
package org.apache.flume.source.twitter;
+import static org.fest.reflect.core.Reflection.field;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.text.SimpleDateFormat;
@@ -34,6 +35,8 @@ import org.apache.flume.channel.ChannelProcessor;
import org.apache.flume.channel.MemoryChannel;
import org.apache.flume.channel.ReplicatingChannelSelector;
import org.apache.flume.conf.Configurables;
+import org.apache.flume.instrumentation.ChannelCounter;
+import org.apache.flume.instrumentation.SourceCounter;
import org.apache.flume.sink.DefaultSinkProcessor;
import org.apache.flume.sink.LoggerSink;
import org.junit.Assert;
@@ -72,6 +75,7 @@ public class TestTwitterSource extends Assert {
context.put("accessToken", accessToken);
context.put("accessTokenSecret", accessTokenSecret);
context.put("maxBatchDurationMillis", "1000");
+ context.put("maxBatchSize", "1");
TwitterSource source = new TwitterSource();
source.configure(context);
@@ -100,6 +104,23 @@ public class TestTwitterSource extends Assert {
source.stop();
sinkRunner.stop();
sink.stop();
+
+ long successfulEvents = getTwitterCounterGroup(source).getEventReceivedCount();
+ long receivedEvents = getTwitterCounterGroup(source).getEventReceivedCount();
+ long channelEvents = getMemoryChannelCounterGroup((MemoryChannel)channel).getEventPutAttemptCount();
+
+ assertEquals("Received vs. Success:", receivedEvents, successfulEvents);
+ assertEquals("Success vs. Channel", channelEvents, successfulEvents);
+
+ }
+
+ private SourceCounter getTwitterCounterGroup(TwitterSource source) {
+ return field("sourceCounter").ofType(SourceCounter.class).in(source).get();
+ }
+
+
+ private ChannelCounter getMemoryChannelCounterGroup(MemoryChannel source) {
+ return field("channelCounter").ofType(ChannelCounter.class).in(source).get();
}
@Test