You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by rg...@apache.org on 2023/03/18 18:03:55 UTC
[flume-twitter] 01/01: FLUME-3455 - Move Twitter Source to its own repo
This is an automated email from the ASF dual-hosted git repository.
rgoers pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flume-twitter.git
commit 6f08e72901160f568bb6fce6d32ec53f2fb5f8e0
Author: Ralph Goers <rg...@apache.org>
AuthorDate: Sat Mar 18 11:03:46 2023 -0700
FLUME-3455 - Move Twitter Source to its own repo
---
.asf.yaml | 40 +++
CHANGELOG | 10 +
LICENSE.txt | 245 +++++++++++++++
NOTICE.txt | 3 +
README.md | 58 ++++
RELEASE-NOTES.txt | 26 ++
checkstyle-header.txt | 16 +
findbugs-exclude-filter.xml | 31 ++
flume-twitter-dist/pom.xml | 153 +++++++++
flume-twitter-dist/src/assembly/bin.xml | 50 +++
flume-twitter-dist/src/assembly/src.xml | 45 +++
flume-twitter-source/pom.xml | 74 +++++
.../apache/flume/source/twitter/TwitterSource.java | 346 +++++++++++++++++++++
.../flume/source/twitter/TestTwitterSource.java | 132 ++++++++
flume-twitter-source/src/test/resources/log4j2.xml | 37 +++
.../src/test/resources/twitter-flume.conf | 92 ++++++
pom.xml | 286 +++++++++++++++++
17 files changed, 1644 insertions(+)
diff --git a/.asf.yaml b/.asf.yaml
new file mode 100644
index 0000000..c0a61c5
--- /dev/null
+++ b/.asf.yaml
@@ -0,0 +1,40 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# `.asf.yaml` is a branch-specific YAML configuration file for Git repositories to control features such as notifications, GitHub settings, etc.
+# See its documentation for details: https://cwiki.apache.org/confluence/display/INFRA/Git+-+.asf.yaml+features
+
+notifications:
+ # GitHub already provides notifications for PRs and issues.
+ # Please don't duplicate that noise here!
+ commits: commits@flume.apache.org
+ jira_options: link label
+github:
+ description: "Apache Flume Twitter provides the Twitter Source for Apache Flume"
+ homepage: https://logging.apache.org/flume/
+ features:
+ issues: true
+ del_branch_on_merge: true
+ autolink_jira:
+ - FLUME
+ labels:
+ - apache
+ - api
+ - java
+ - jvm
+ - library
+ - flume
+ protected_branches:
+ main: {}
diff --git a/CHANGELOG b/CHANGELOG
new file mode 100644
index 0000000..1d1fa92
--- /dev/null
+++ b/CHANGELOG
@@ -0,0 +1,10 @@
+Release Notes - Flume Twitter - Version 2.0.0
+
+** Bug
+ *
+
+** Improvement
+ * [FLUME-3455] - Move the Twitter Source to its own repo.
+
+
+
diff --git a/LICENSE.txt b/LICENSE.txt
new file mode 100644
index 0000000..9fa7156
--- /dev/null
+++ b/LICENSE.txt
@@ -0,0 +1,245 @@
+
+ Apache License
+ Version 2.0, January 2004
+ http://www.apache.org/licenses/
+
+ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+ 1. Definitions.
+
+ "License" shall mean the terms and conditions for use, reproduction,
+ and distribution as defined by Sections 1 through 9 of this document.
+
+ "Licensor" shall mean the copyright owner or entity authorized by
+ the copyright owner that is granting the License.
+
+ "Legal Entity" shall mean the union of the acting entity and all
+ other entities that control, are controlled by, or are under common
+ control with that entity. For the purposes of this definition,
+ "control" means (i) the power, direct or indirect, to cause the
+ direction or management of such entity, whether by contract or
+ otherwise, or (ii) ownership of fifty percent (50%) or more of the
+ outstanding shares, or (iii) beneficial ownership of such entity.
+
+ "You" (or "Your") shall mean an individual or Legal Entity
+ exercising permissions granted by this License.
+
+ "Source" form shall mean the preferred form for making modifications,
+ including but not limited to software source code, documentation
+ source, and configuration files.
+
+ "Object" form shall mean any form resulting from mechanical
+ transformation or translation of a Source form, including but
+ not limited to compiled object code, generated documentation,
+ and conversions to other media types.
+
+ "Work" shall mean the work of authorship, whether in Source or
+ Object form, made available under the License, as indicated by a
+ copyright notice that is included in or attached to the work
+ (an example is provided in the Appendix below).
+
+ "Derivative Works" shall mean any work, whether in Source or Object
+ form, that is based on (or derived from) the Work and for which the
+ editorial revisions, annotations, elaborations, or other modifications
+ represent, as a whole, an original work of authorship. For the purposes
+ of this License, Derivative Works shall not include works that remain
+ separable from, or merely link (or bind by name) to the interfaces of,
+ the Work and Derivative Works thereof.
+
+ "Contribution" shall mean any work of authorship, including
+ the original version of the Work and any modifications or additions
+ to that Work or Derivative Works thereof, that is intentionally
+ submitted to Licensor for inclusion in the Work by the copyright owner
+ or by an individual or Legal Entity authorized to submit on behalf of
+ the copyright owner. For the purposes of this definition, "submitted"
+ means any form of electronic, verbal, or written communication sent
+ to the Licensor or its representatives, including but not limited to
+ communication on electronic mailing lists, source code control systems,
+ and issue tracking systems that are managed by, or on behalf of, the
+ Licensor for the purpose of discussing and improving the Work, but
+ excluding communication that is conspicuously marked or otherwise
+ designated in writing by the copyright owner as "Not a Contribution."
+
+ "Contributor" shall mean Licensor and any individual or Legal Entity
+ on behalf of whom a Contribution has been received by Licensor and
+ subsequently incorporated within the Work.
+
+ 2. Grant of Copyright License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ copyright license to reproduce, prepare Derivative Works of,
+ publicly display, publicly perform, sublicense, and distribute the
+ Work and such Derivative Works in Source or Object form.
+
+ 3. Grant of Patent License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ (except as stated in this section) patent license to make, have made,
+ use, offer to sell, sell, import, and otherwise transfer the Work,
+ where such license applies only to those patent claims licensable
+ by such Contributor that are necessarily infringed by their
+ Contribution(s) alone or by combination of their Contribution(s)
+ with the Work to which such Contribution(s) was submitted. If You
+ institute patent litigation against any entity (including a
+ cross-claim or counterclaim in a lawsuit) alleging that the Work
+ or a Contribution incorporated within the Work constitutes direct
+ or contributory patent infringement, then any patent licenses
+ granted to You under this License for that Work shall terminate
+ as of the date such litigation is filed.
+
+ 4. Redistribution. You may reproduce and distribute copies of the
+ Work or Derivative Works thereof in any medium, with or without
+ modifications, and in Source or Object form, provided that You
+ meet the following conditions:
+
+ (a) You must give any other recipients of the Work or
+ Derivative Works a copy of this License; and
+
+ (b) You must cause any modified files to carry prominent notices
+ stating that You changed the files; and
+
+ (c) You must retain, in the Source form of any Derivative Works
+ that You distribute, all copyright, patent, trademark, and
+ attribution notices from the Source form of the Work,
+ excluding those notices that do not pertain to any part of
+ the Derivative Works; and
+
+ (d) If the Work includes a "NOTICE" text file as part of its
+ distribution, then any Derivative Works that You distribute must
+ include a readable copy of the attribution notices contained
+ within such NOTICE file, excluding those notices that do not
+ pertain to any part of the Derivative Works, in at least one
+ of the following places: within a NOTICE text file distributed
+ as part of the Derivative Works; within the Source form or
+ documentation, if provided along with the Derivative Works; or,
+ within a display generated by the Derivative Works, if and
+ wherever such third-party notices normally appear. The contents
+ of the NOTICE file are for informational purposes only and
+ do not modify the License. You may add Your own attribution
+ notices within Derivative Works that You distribute, alongside
+ or as an addendum to the NOTICE text from the Work, provided
+ that such additional attribution notices cannot be construed
+ as modifying the License.
+
+ You may add Your own copyright statement to Your modifications and
+ may provide additional or different license terms and conditions
+ for use, reproduction, or distribution of Your modifications, or
+ for any such Derivative Works as a whole, provided Your use,
+ reproduction, and distribution of the Work otherwise complies with
+ the conditions stated in this License.
+
+ 5. Submission of Contributions. Unless You explicitly state otherwise,
+ any Contribution intentionally submitted for inclusion in the Work
+ by You to the Licensor shall be under the terms and conditions of
+ this License, without any additional terms or conditions.
+ Notwithstanding the above, nothing herein shall supersede or modify
+ the terms of any separate license agreement you may have executed
+ with Licensor regarding such Contributions.
+
+ 6. Trademarks. This License does not grant permission to use the trade
+ names, trademarks, service marks, or product names of the Licensor,
+ except as required for reasonable and customary use in describing the
+ origin of the Work and reproducing the content of the NOTICE file.
+
+ 7. Disclaimer of Warranty. Unless required by applicable law or
+ agreed to in writing, Licensor provides the Work (and each
+ Contributor provides its Contributions) on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ implied, including, without limitation, any warranties or conditions
+ of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+ PARTICULAR PURPOSE. You are solely responsible for determining the
+ appropriateness of using or redistributing the Work and assume any
+ risks associated with Your exercise of permissions under this License.
+
+ 8. Limitation of Liability. In no event and under no legal theory,
+ whether in tort (including negligence), contract, or otherwise,
+ unless required by applicable law (such as deliberate and grossly
+ negligent acts) or agreed to in writing, shall any Contributor be
+ liable to You for damages, including any direct, indirect, special,
+ incidental, or consequential damages of any character arising as a
+ result of this License or out of the use or inability to use the
+ Work (including but not limited to damages for loss of goodwill,
+ work stoppage, computer failure or malfunction, or any and all
+ other commercial damages or losses), even if such Contributor
+ has been advised of the possibility of such damages.
+
+ 9. Accepting Warranty or Additional Liability. While redistributing
+ the Work or Derivative Works thereof, You may choose to offer,
+ and charge a fee for, acceptance of support, warranty, indemnity,
+ or other liability obligations and/or rights consistent with this
+ License. However, in accepting such obligations, You may act only
+ on Your own behalf and on Your sole responsibility, not on behalf
+ of any other Contributor, and only if You agree to indemnify,
+ defend, and hold each Contributor harmless for any liability
+ incurred by, or claims asserted against, such Contributor by reason
+ of your accepting any such warranty or additional liability.
+
+ END OF TERMS AND CONDITIONS
+
+ APPENDIX: How to apply the Apache License to your work.
+
+ To apply the Apache License to your work, attach the following
+ boilerplate notice, with the fields enclosed by brackets "[]"
+ replaced with your own identifying information. (Don't include
+ the brackets!) The text should be enclosed in the appropriate
+ comment syntax for the file format. We also recommend that a
+ file or class name and description of purpose be included on the
+ same "printed page" as the copyright notice for easier
+ identification within third-party archives.
+
+ Copyright [yyyy] [name of copyright owner]
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+====
+
+The following files are included under the 2-Clause BSD License
+
+flume-ng-sinks/flume-ng-morphline-solr-sink/src/test/resources/solr/collection1/conf/lang/stopwords_ar.txt
+flume-ng-sinks/flume-ng-morphline-solr-sink/src/test/resources/solr/collection1/conf/lang/stopwords_bg.txt
+flume-ng-sinks/flume-ng-morphline-solr-sink/src/test/resources/solr/collection1/conf/lang/stopwords_da.txt
+flume-ng-sinks/flume-ng-morphline-solr-sink/src/test/resources/solr/collection1/conf/lang/stopwords_de.txt
+flume-ng-sinks/flume-ng-morphline-solr-sink/src/test/resources/solr/collection1/conf/lang/stopwords_es.txt
+flume-ng-sinks/flume-ng-morphline-solr-sink/src/test/resources/solr/collection1/conf/lang/stopwords_fa.txt
+flume-ng-sinks/flume-ng-morphline-solr-sink/src/test/resources/solr/collection1/conf/lang/stopwords_fi.txt
+flume-ng-sinks/flume-ng-morphline-solr-sink/src/test/resources/solr/collection1/conf/lang/stopwords_fr.txt
+flume-ng-sinks/flume-ng-morphline-solr-sink/src/test/resources/solr/collection1/conf/lang/stopwords_hi.txt
+flume-ng-sinks/flume-ng-morphline-solr-sink/src/test/resources/solr/collection1/conf/lang/stopwords_hu.txt
+flume-ng-sinks/flume-ng-morphline-solr-sink/src/test/resources/solr/collection1/conf/lang/stopwords_it.txt
+flume-ng-sinks/flume-ng-morphline-solr-sink/src/test/resources/solr/collection1/conf/lang/stopwords_nl.txt
+flume-ng-sinks/flume-ng-morphline-solr-sink/src/test/resources/solr/collection1/conf/lang/stopwords_no.txt
+flume-ng-sinks/flume-ng-morphline-solr-sink/src/test/resources/solr/collection1/conf/lang/stopwords_pt.txt
+flume-ng-sinks/flume-ng-morphline-solr-sink/src/test/resources/solr/collection1/conf/lang/stopwords_ro.txt
+flume-ng-sinks/flume-ng-morphline-solr-sink/src/test/resources/solr/collection1/conf/lang/stopwords_ru.txt
+flume-ng-sinks/flume-ng-morphline-solr-sink/src/test/resources/solr/collection1/conf/lang/stopwords_sv.txt
+
+Redistribution and use in source and binary forms, with or without modification,
+are permitted provided that the following conditions are met:
+
+1. Redistributions of source code must retain the above copyright notice, this
+list of conditions and the following disclaimer.
+
+2. Redistributions in binary form must reproduce the above copyright notice,
+this list of conditions and the following disclaimer in the documentation and/or
+other materials provided with the distribution.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR
+ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
+ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
diff --git a/NOTICE.txt b/NOTICE.txt
new file mode 100644
index 0000000..33c6200
--- /dev/null
+++ b/NOTICE.txt
@@ -0,0 +1,3 @@
+Apache Flume Spring Boot
+Copyright 2022-2023 The Apache Software Foundation
+
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..e692e04
--- /dev/null
+++ b/README.md
@@ -0,0 +1,58 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Welcome to Apache Flume Twitter!
+
+Apache Flume is a distributed, reliable, and available service for efficiently
+collecting, aggregating, and moving large amounts of event data. It has a simple
+and flexible architecture based on streaming data flows. It is robust and fault
+tolerant with tunable reliability mechanisms and many failover and recovery
+mechanisms. The system is centrally managed and allows for intelligent dynamic
+management. It uses a simple extensible data model that allows for online
+analytic application.
+
+The Apache Flume Twitter module provides a source to receive data from Twitter
+
+Apache Flume Twitter is open-sourced under the Apache Software Foundation License v2.0.
+
+## Documentation
+
+Documentation is included in the binary distribution under the docs directory.
+In source form, it can be found in the flume-ng-doc directory.
+
+The Flume 1.x guide and FAQ are available here:
+
+* https://cwiki.apache.org/FLUME
+* https://cwiki.apache.org/confluence/display/FLUME/Getting+Started
+
+## Contact us!
+
+* Mailing lists: https://cwiki.apache.org/confluence/display/FLUME/Mailing+Lists
+* Slack channel #flume on https://the-asf.slack.com/
+
+Bug and Issue tracker.
+
+* https://github.com/apache/flume-twitter/issues
+
+## Compiling Flume Twitter
+
+Compiling Flume Twitter requires the following tools:
+
+* Oracle Java JDK 11
+* Apache Maven 3.x
diff --git a/RELEASE-NOTES.txt b/RELEASE-NOTES.txt
new file mode 100644
index 0000000..fdf94b3
--- /dev/null
+++ b/RELEASE-NOTES.txt
@@ -0,0 +1,26 @@
+Apache Flume Twitter 2.0.0
+
+CONTENTS
+1. What is Apache Flume Twitter
+2. Major changes in this Release
+3. How to Get Involved
+4. How to Report Issues
+
+1. What is Apache Flume Twitter
+Flume is a distributed, reliable, and available service for
+efficiently collecting, aggregating, and moving large amounts of event
+data. Flume Twitter allows Flume to collect data from Twitter.
+
+2. Major changes in this Release
+For a detailed list of changes, please see the CHANGELOG file included
+in this distribution.
+
+4. How to Get Involved
+The Apache Flume project really needs and appreciates any contributions,
+including documentation help, source code and feedback. If you are interested
+in contributing, please visit:
+https://cwiki.apache.org/confluence/display/FLUME/How+to+Contribute
+
+5. How to Report Issues
+The Apache Flume Spring Boot project uses GitHub issues for issue tracking. Please see
+https://github.com/apache/flume-twitter/issues
diff --git a/checkstyle-header.txt b/checkstyle-header.txt
new file mode 100644
index 0000000..4f33236
--- /dev/null
+++ b/checkstyle-header.txt
@@ -0,0 +1,16 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
diff --git a/findbugs-exclude-filter.xml b/findbugs-exclude-filter.xml
new file mode 100644
index 0000000..327be31
--- /dev/null
+++ b/findbugs-exclude-filter.xml
@@ -0,0 +1,31 @@
+<?xml version="1.0" encoding="iso-8859-1"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<!-- ===================================================================== -->
+<!-- $Id: findbugs-exclude-filter.xml 773234 2009-05-09 15:27:59Z rgoers $ -->
+<!-- ===================================================================== -->
+<FindBugsFilter>
+ <!-- Enable only high priority warnings -->
+ <Match>
+ <Priority value="2"/>
+ </Match>
+
+ <Match>
+ <Priority value="3"/>
+ </Match>
+</FindBugsFilter>
diff --git a/flume-twitter-dist/pom.xml b/flume-twitter-dist/pom.xml
new file mode 100644
index 0000000..9816350
--- /dev/null
+++ b/flume-twitter-dist/pom.xml
@@ -0,0 +1,153 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.flume</groupId>
+ <artifactId>flume-twitter-parent</artifactId>
+ <version>2.0.0</version>
+ </parent>
+
+ <artifactId>flume-twitter-dist</artifactId>
+ <name>Flume Twitter Distribution</name>
+ <packaging>pom</packaging>
+
+ <properties>
+ <maven.deploy.skip>true</maven.deploy.skip>
+ <maven.install.skip>true</maven.install.skip>
+ <maven.test.skip>true</maven.test.skip>
+ <spotless.check.skip>true</spotless.check.skip>
+ </properties>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flume</groupId>
+ <artifactId>flume-twitter-source</artifactId>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <!-- calculate checksums of source release for Apache dist area -->
+ <plugin>
+ <groupId>net.nicoulaj.maven.plugins</groupId>
+ <artifactId>checksum-maven-plugin</artifactId>
+ <version>${checksum-maven-plugin.version}</version>
+ <executions>
+ <execution>
+ <id>calculate-checksums</id>
+ <goals>
+ <goal>files</goal>
+ </goals>
+ <!-- execute prior to maven-gpg-plugin:sign due to https://github.com/nicoulaj/checksum-maven-plugin/issues/112 -->
+ <phase>post-integration-test</phase>
+ <configuration>
+ <algorithms>
+ <algorithm>SHA-512</algorithm>
+ </algorithms>
+ <!-- https://maven.apache.org/apache-resource-bundles/#source-release-assembly-descriptor -->
+ <fileSets>
+ <fileSet>
+ <directory>${project.build.directory}</directory>
+ <includes>
+ <include>apache-flume-twitter-${project.version}-src.zip</include>
+ <include>apache-flume-twitter-${project.version}-src.tar.gz</include>
+ <include>apache-flume-twitter-${project.version}-bin.zip</include>
+ <include>apache-flume-twitter-${project.version}-bin.tar.gz</include>
+ </includes>
+ </fileSet>
+ </fileSets>
+ <csvSummary>false</csvSummary>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <artifactId>maven-antrun-plugin</artifactId>
+ <executions>
+ <execution>
+ <phase>post-integration-test</phase>
+ <configuration>
+ <target>
+ <property name="spaces" value=" " />
+ <concat destfile="${project.build.directory}/apache-flume-twitter-${project.version}-src.zip.sha512" append="yes">${spaces}apache-flume-twitter-${project.version}-src.zip</concat>
+ <concat destfile="${project.build.directory}/apache-flume-twitter-${project.version}-src.tar.gz.sha512" append="yes">${spaces}apache-flume-twitter-${project.version}-src.tar.gz</concat>
+ <concat destfile="${project.build.directory}/apache-flume-twitter-${project.version}-bin.zip.sha512" append="yes">${spaces}apache-flume-twitter-${project.version}-bin.zip</concat>
+ <concat destfile="${project.build.directory}/apache-flume-twitter-${project.version}-bin.tar.gz.sha512" append="yes">${spaces}apache-flume-twitter-${project.version}-bin.tar.gz</concat>
+ </target>
+ </configuration>
+ <goals>
+ <goal>run</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>source-release-assembly</id>
+ <phase>package</phase>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ <configuration>
+ <finalName>apache-flume-twitter-${project.version}</finalName>
+ <descriptors>
+ <descriptor>src/assembly/src.xml</descriptor>
+ </descriptors>
+ <tarLongFileMode>gnu</tarLongFileMode>
+ </configuration>
+ </execution>
+ <execution>
+ <id>binary</id>
+ <configuration>
+ <finalName>apache-flume-twitter-${project.version}</finalName>
+ <descriptors>
+ <descriptor>src/assembly/bin.xml</descriptor>
+ </descriptors>
+ <tarLongFileMode>gnu</tarLongFileMode>
+ </configuration>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ <phase>package</phase>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-gpg-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>sign-release-artifacts</id>
+ <goals>
+ <goal>sign</goal>
+ </goals>
+ <configuration>
+ <keyname>${SigningUserName}</keyname>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
diff --git a/flume-twitter-dist/src/assembly/bin.xml b/flume-twitter-dist/src/assembly/bin.xml
new file mode 100644
index 0000000..cb67c17
--- /dev/null
+++ b/flume-twitter-dist/src/assembly/bin.xml
@@ -0,0 +1,50 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<assembly>
+ <id>bin</id>
+ <formats>
+ <format>tar.gz</format>
+ <format>zip</format>
+ </formats>
+ <baseDirectory>apache-flume-twitter-${project.version}-bin</baseDirectory>
+ <includeSiteDirectory>false</includeSiteDirectory>
+ <moduleSets>
+ <moduleSet>
+ <useAllReactorProjects>true</useAllReactorProjects>
+ </moduleSet>
+ </moduleSets>
+ <dependencySets>
+ <dependencySet>
+ <includes>
+ <include>org.apache.flume:flume-twitter</include>
+ </includes>
+ <outputDirectory></outputDirectory>
+ <unpack>false</unpack>
+ </dependencySet>
+ </dependencySets>
+
+ <fileSets>
+ <fileSet>
+ <directory>../</directory>
+ <includes>
+ <include>LICENSE.txt</include>
+ <include>NOTICE.txt</include>
+ <include>RELEASE-NOTES.txt</include>
+ </includes>
+ </fileSet>
+ </fileSets>
+</assembly>
diff --git a/flume-twitter-dist/src/assembly/src.xml b/flume-twitter-dist/src/assembly/src.xml
new file mode 100644
index 0000000..5873726
--- /dev/null
+++ b/flume-twitter-dist/src/assembly/src.xml
@@ -0,0 +1,45 @@
+<?xml version='1.0' encoding='UTF-8'?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+
+<assembly>
+ <id>src</id>
+ <formats>
+ <format>zip</format>
+ <format>tar.gz</format>
+ </formats>
+ <baseDirectory>apache-flume-twitter-${project.version}-src</baseDirectory>
+ <fileSets>
+ <fileSet>
+ <directory>../</directory>
+
+ <excludes>
+ <exclude>**/target/**</exclude>
+ <exclude>**/.classpath</exclude>
+ <exclude>**/.project</exclude>
+ <exclude>**/.idea/**</exclude>
+ <exclude>**/*.iml</exclude>
+ <exclude>**/.settings/**</exclude>
+ <exclude>lib/**</exclude>
+ <exclude>**/.DS_Store</exclude>
+ <exclude>./mvn/wrapper/maven-wrapper.jar</exclude>
+ </excludes>
+ </fileSet>
+ </fileSets>
+</assembly>
diff --git a/flume-twitter-source/pom.xml b/flume-twitter-source/pom.xml
new file mode 100644
index 0000000..a99874a
--- /dev/null
+++ b/flume-twitter-source/pom.xml
@@ -0,0 +1,74 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <artifactId>flume-twitter-parent</artifactId>
+ <groupId>org.apache.flume</groupId>
+ <version>2.0.0</version>
+ </parent>
+
+ <groupId>org.apache.flume</groupId>
+ <artifactId>flume-twitter-source</artifactId>
+ <name>Flume Twitter Source</name>
+
+ <properties>
+ <!-- TODO fix spotbugs/pmd violations -->
+ <spotbugs.maxAllowedViolations>4</spotbugs.maxAllowedViolations>
+ <pmd.maxAllowedViolations>2</pmd.maxAllowedViolations>
+ <module.name>org.apache.flume.source.twitter</module.name>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flume</groupId>
+ <artifactId>flume-ng-core</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.easytesting</groupId>
+ <artifactId>fest-reflect</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.twitter4j</groupId>
+ <artifactId>twitter4j-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.twitter4j</groupId>
+ <artifactId>twitter4j-media-support</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.twitter4j</groupId>
+ <artifactId>twitter4j-stream</artifactId>
+ </dependency>
+ </dependencies>
+
+ <build>
+ </build>
+</project>
diff --git a/flume-twitter-source/src/main/java/org/apache/flume/source/twitter/TwitterSource.java b/flume-twitter-source/src/main/java/org/apache/flume/source/twitter/TwitterSource.java
new file mode 100644
index 0000000..04849bb
--- /dev/null
+++ b/flume-twitter-source/src/main/java/org/apache/flume/source/twitter/TwitterSource.java
@@ -0,0 +1,346 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.source.twitter;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.text.DecimalFormat;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Field;
+import org.apache.avro.Schema.Type;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData.Record;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDrivenSource;
+import org.apache.flume.annotations.InterfaceAudience;
+import org.apache.flume.annotations.InterfaceStability;
+import org.apache.flume.conf.BatchSizeSupported;
+import org.apache.flume.conf.Configurable;
+import org.apache.flume.event.EventBuilder;
+import org.apache.flume.instrumentation.SourceCounter;
+import org.apache.flume.source.AbstractSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import twitter4j.MediaEntity;
+import twitter4j.StallWarning;
+import twitter4j.Status;
+import twitter4j.StatusDeletionNotice;
+import twitter4j.StatusListener;
+import twitter4j.TwitterStream;
+import twitter4j.TwitterStreamFactory;
+import twitter4j.User;
+import twitter4j.auth.AccessToken;
+
+/**
+ * Demo Flume source that connects via Streaming API to the 1% sample twitter
+ * firehose, continuously downloads tweets, converts them to Avro format and
+ * sends Avro events to a downstream Flume sink.
+ *
+ * Requires the consumer and access tokens and secrets of a Twitter developer
+ * account
+ */
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class TwitterSource
+ extends AbstractSource
+ implements EventDrivenSource, Configurable, StatusListener, BatchSizeSupported {
+
+ private TwitterStream twitterStream;
+ private Schema avroSchema;
+
+ private long docCount = 0;
+ private long startTime = 0;
+ private long exceptionCount = 0;
+ private long totalTextIndexed = 0;
+ private long skippedDocs = 0;
+ private long batchEndTime = 0;
+ private final List<Record> docs = new ArrayList<Record>();
+ private final ByteArrayOutputStream serializationBuffer =
+ new ByteArrayOutputStream();
+ private DataFileWriter<GenericRecord> dataFileWriter;
+
+ private int maxBatchSize = 1000;
+ private int maxBatchDurationMillis = 1000;
+
+ private SourceCounter sourceCounter;
+
+ // Fri May 14 02:52:55 +0000 2010
+ private SimpleDateFormat formatterTo =
+ new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'");
+ private DecimalFormat numFormatter = new DecimalFormat("###,###.###");
+
+ private static int REPORT_INTERVAL = 100;
+ private static int STATS_INTERVAL = REPORT_INTERVAL * 10;
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(TwitterSource.class);
+
+ public TwitterSource() {
+ }
+
+ @Override
+ public void configure(Context context) {
+ String consumerKey = context.getString("consumerKey");
+ String consumerSecret = context.getString("consumerSecret");
+ String accessToken = context.getString("accessToken");
+ String accessTokenSecret = context.getString("accessTokenSecret");
+
+ twitterStream = new TwitterStreamFactory().getInstance();
+ twitterStream.setOAuthConsumer(consumerKey, consumerSecret);
+ twitterStream.setOAuthAccessToken(new AccessToken(accessToken,
+ accessTokenSecret));
+ twitterStream.addListener(this);
+ avroSchema = createAvroSchema();
+ dataFileWriter = new DataFileWriter<GenericRecord>(
+ new GenericDatumWriter<GenericRecord>(avroSchema));
+
+ maxBatchSize = context.getInteger("maxBatchSize", maxBatchSize);
+ maxBatchDurationMillis = context.getInteger("maxBatchDurationMillis",
+ maxBatchDurationMillis);
+
+ if (sourceCounter == null) {
+ sourceCounter = new SourceCounter(getName());
+ }
+ }
+
+ @Override
+ public synchronized void start() {
+ LOGGER.info("Starting twitter source {} ...", this);
+ docCount = 0;
+ startTime = System.currentTimeMillis();
+ exceptionCount = 0;
+ totalTextIndexed = 0;
+ skippedDocs = 0;
+ batchEndTime = System.currentTimeMillis() + maxBatchDurationMillis;
+ twitterStream.sample();
+ LOGGER.info("Twitter source {} started.", getName());
+ // This should happen at the end of the start method, since this will
+ // change the lifecycle status of the component to tell the Flume
+ // framework that this component has started. Doing this any earlier
+ // tells the framework that the component started successfully, even
+ // if the method actually fails later.
+ super.start();
+ }
+
+ @Override
+ public synchronized void stop() {
+ LOGGER.info("Twitter source {} stopping...", getName());
+ twitterStream.shutdown();
+ super.stop();
+ LOGGER.info("Twitter source {} stopped.", getName());
+ }
+
+ public void onStatus(Status status) {
+ Record doc = extractRecord("", avroSchema, status);
+ if (doc == null) {
+ return; // skip
+ }
+ docs.add(doc);
+ if (docs.size() >= maxBatchSize ||
+ System.currentTimeMillis() >= batchEndTime) {
+ sourceCounter.addToEventReceivedCount(docs.size());
+ batchEndTime = System.currentTimeMillis() + maxBatchDurationMillis;
+ byte[] bytes;
+ try {
+ bytes = serializeToAvro(avroSchema, docs);
+ } catch (IOException e) {
+ sourceCounter.incrementGenericProcessingFail();
+ LOGGER.error("Exception while serializing tweet", e);
+ return; //skip
+ }
+ Event event = EventBuilder.withBody(bytes);
+ getChannelProcessor().processEvent(event); // send event to the flume sink
+ docs.clear();
+ sourceCounter.addToEventAcceptedCount(docs.size());
+ }
+ docCount++;
+ if ((docCount % REPORT_INTERVAL) == 0) {
+ LOGGER.info(String.format("Processed %s docs",
+ numFormatter.format(docCount)));
+ }
+ if ((docCount % STATS_INTERVAL) == 0) {
+ logStats();
+ }
+ }
+
+ private Schema createAvroSchema() {
+ Schema avroSchema = Schema.createRecord("Doc", "adoc", null, false);
+ List<Field> fields = new ArrayList<Field>();
+ fields.add(new Field("id", Schema.create(Type.STRING), null, null));
+ fields.add(new Field("user_friends_count",
+ createOptional(Schema.create(Type.INT)),
+ null, null));
+ fields.add(new Field("user_location",
+ createOptional(Schema.create(Type.STRING)),
+ null, null));
+ fields.add(new Field("user_description",
+ createOptional(Schema.create(Type.STRING)),
+ null, null));
+ fields.add(new Field("user_statuses_count",
+ createOptional(Schema.create(Type.INT)),
+ null, null));
+ fields.add(new Field("user_followers_count",
+ createOptional(Schema.create(Type.INT)),
+ null, null));
+ fields.add(new Field("user_name",
+ createOptional(Schema.create(Type.STRING)),
+ null, null));
+ fields.add(new Field("user_screen_name",
+ createOptional(Schema.create(Type.STRING)),
+ null, null));
+ fields.add(new Field("created_at",
+ createOptional(Schema.create(Type.STRING)),
+ null, null));
+ fields.add(new Field("text",
+ createOptional(Schema.create(Type.STRING)),
+ null, null));
+ fields.add(new Field("retweet_count",
+ createOptional(Schema.create(Type.LONG)),
+ null, null));
+ fields.add(new Field("retweeted",
+ createOptional(Schema.create(Type.BOOLEAN)),
+ null, null));
+ fields.add(new Field("in_reply_to_user_id",
+ createOptional(Schema.create(Type.LONG)),
+ null, null));
+ fields.add(new Field("source",
+ createOptional(Schema.create(Type.STRING)),
+ null, null));
+ fields.add(new Field("in_reply_to_status_id",
+ createOptional(Schema.create(Type.LONG)),
+ null, null));
+ fields.add(new Field("media_url_https",
+ createOptional(Schema.create(Type.STRING)),
+ null, null));
+ fields.add(new Field("expanded_url",
+ createOptional(Schema.create(Type.STRING)),
+ null, null));
+ avroSchema.setFields(fields);
+ return avroSchema;
+ }
+
+ private Record extractRecord(String idPrefix, Schema avroSchema, Status status) {
+ User user = status.getUser();
+ Record doc = new Record(avroSchema);
+
+ doc.put("id", idPrefix + status.getId());
+ doc.put("created_at", formatterTo.format(status.getCreatedAt()));
+ doc.put("retweet_count", Long.valueOf(status.getRetweetCount()));
+ doc.put("retweeted", status.isRetweet());
+ doc.put("in_reply_to_user_id", status.getInReplyToUserId());
+ doc.put("in_reply_to_status_id", status.getInReplyToStatusId());
+
+ addString(doc, "source", status.getSource());
+ addString(doc, "text", status.getText());
+
+ MediaEntity[] mediaEntities = status.getMediaEntities();
+ if (mediaEntities.length > 0) {
+ addString(doc, "media_url_https", mediaEntities[0].getMediaURLHttps());
+ addString(doc, "expanded_url", mediaEntities[0].getExpandedURL());
+ }
+
+ doc.put("user_friends_count", user.getFriendsCount());
+ doc.put("user_statuses_count", user.getStatusesCount());
+ doc.put("user_followers_count", user.getFollowersCount());
+ addString(doc, "user_location", user.getLocation());
+ addString(doc, "user_description", user.getDescription());
+ addString(doc, "user_screen_name", user.getScreenName());
+ addString(doc, "user_name", user.getName());
+ return doc;
+ }
+
+ private byte[] serializeToAvro(Schema avroSchema, List<Record> docList)
+ throws IOException {
+ serializationBuffer.reset();
+ dataFileWriter.create(avroSchema, serializationBuffer);
+ try {
+ for (Record doc2 : docList) {
+ dataFileWriter.append(doc2);
+ }
+ } finally {
+ dataFileWriter.close();
+ }
+ return serializationBuffer.toByteArray();
+ }
+
+ private Schema createOptional(Schema schema) {
+ return Schema.createUnion(Arrays.asList(
+ new Schema[] { schema, Schema.create(Type.NULL) }));
+ }
+
+ private void addString(Record doc, String avroField, String val) {
+ if (val == null) {
+ return;
+ }
+ doc.put(avroField, val);
+ totalTextIndexed += val.length();
+ }
+
+ private void logStats() {
+ double mbIndexed = totalTextIndexed / (1024 * 1024.0);
+ long seconds = (System.currentTimeMillis() - startTime) / 1000;
+ seconds = Math.max(seconds, 1);
+ LOGGER.info(String.format("Total docs indexed: %s, total skipped docs: %s",
+ numFormatter.format(docCount), numFormatter.format(skippedDocs)));
+ LOGGER.info(String.format(" %s docs/second",
+ numFormatter.format(docCount / seconds)));
+ LOGGER.info(String.format("Run took %s seconds and processed:",
+ numFormatter.format(seconds)));
+ LOGGER.info(String.format(" %s MB/sec sent to index",
+ numFormatter.format(((float) totalTextIndexed / (1024 * 1024)) / seconds)));
+ LOGGER.info(String.format(" %s MB text sent to index",
+ numFormatter.format(mbIndexed)));
+ LOGGER.info(String.format("There were %s exceptions ignored: ",
+ numFormatter.format(exceptionCount)));
+ }
+
+ public void onDeletionNotice(StatusDeletionNotice statusDeletionNotice) {
+ // Do nothing...
+ }
+
+ public void onScrubGeo(long userId, long upToStatusId) {
+ // Do nothing...
+ }
+
+ public void onStallWarning(StallWarning warning) {
+ // Do nothing...
+ }
+
+ public void onTrackLimitationNotice(int numberOfLimitedStatuses) {
+ // Do nothing...
+ }
+
+ public void onException(Exception e) {
+ LOGGER.error("Exception while streaming tweets", e);
+ }
+
+ @Override
+ public long getBatchSize() {
+ return maxBatchSize;
+ }
+}
diff --git a/flume-twitter-source/src/test/java/org/apache/flume/source/twitter/TestTwitterSource.java b/flume-twitter-source/src/test/java/org/apache/flume/source/twitter/TestTwitterSource.java
new file mode 100644
index 0000000..034c2e3
--- /dev/null
+++ b/flume-twitter-source/src/test/java/org/apache/flume/source/twitter/TestTwitterSource.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.source.twitter;
+
+import static org.fest.reflect.core.Reflection.field;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.text.SimpleDateFormat;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.flume.Channel;
+import org.apache.flume.ChannelSelector;
+import org.apache.flume.Context;
+import org.apache.flume.Sink;
+import org.apache.flume.SinkRunner;
+import org.apache.flume.channel.ChannelProcessor;
+import org.apache.flume.channel.MemoryChannel;
+import org.apache.flume.channel.ReplicatingChannelSelector;
+import org.apache.flume.conf.Configurables;
+import org.apache.flume.instrumentation.ChannelCounter;
+import org.apache.flume.instrumentation.SourceCounter;
+import org.apache.flume.sink.DefaultSinkProcessor;
+import org.apache.flume.sink.LoggerSink;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestTwitterSource extends Assert {
+
+ @BeforeClass
+ public static void setUp() {
+ try {
+ Assume.assumeNotNull(InetAddress.getByName("stream.twitter.com"));
+ } catch (UnknownHostException e) {
+ Assume.assumeTrue(false); // ignore Test if twitter is unreachable
+ }
+ }
+
+ @Test
+ public void testBasic() throws Exception {
+ String consumerKey = System.getProperty("twitter.consumerKey");
+ Assume.assumeNotNull(consumerKey);
+
+ String consumerSecret = System.getProperty("twitter.consumerSecret");
+ Assume.assumeNotNull(consumerSecret);
+
+ String accessToken = System.getProperty("twitter.accessToken");
+ Assume.assumeNotNull(accessToken);
+
+ String accessTokenSecret = System.getProperty("twitter.accessTokenSecret");
+ Assume.assumeNotNull(accessTokenSecret);
+
+ Context context = new Context();
+ context.put("consumerKey", consumerKey);
+ context.put("consumerSecret", consumerSecret);
+ context.put("accessToken", accessToken);
+ context.put("accessTokenSecret", accessTokenSecret);
+ context.put("maxBatchDurationMillis", "1000");
+ context.put("maxBatchSize", "1");
+
+ TwitterSource source = new TwitterSource();
+ source.configure(context);
+
+ Map<String, String> channelContext = new HashMap();
+ channelContext.put("capacity", "1000000");
+ channelContext.put("keep-alive", "0"); // for faster tests
+ Channel channel = new MemoryChannel();
+ Configurables.configure(channel, new Context(channelContext));
+
+ Sink sink = new LoggerSink();
+ sink.setChannel(channel);
+ sink.start();
+ DefaultSinkProcessor proc = new DefaultSinkProcessor();
+ proc.setSinks(Collections.singletonList(sink));
+ SinkRunner sinkRunner = new SinkRunner(proc);
+ sinkRunner.start();
+
+ ChannelSelector rcs = new ReplicatingChannelSelector();
+ rcs.setChannels(Collections.singletonList(channel));
+ ChannelProcessor chp = new ChannelProcessor(rcs);
+ source.setChannelProcessor(chp);
+ source.start();
+
+ Thread.sleep(5000);
+ source.stop();
+ sinkRunner.stop();
+ sink.stop();
+
+ long successfulEvents = getTwitterCounterGroup(source).getEventReceivedCount();
+ long receivedEvents = getTwitterCounterGroup(source).getEventReceivedCount();
+ long channelEvents = getMemoryChannelCounterGroup((MemoryChannel)channel).getEventPutAttemptCount();
+
+ assertEquals("Received vs. Success:", receivedEvents, successfulEvents);
+ assertEquals("Success vs. Channel", channelEvents, successfulEvents);
+
+ }
+
+ private SourceCounter getTwitterCounterGroup(TwitterSource source) {
+ return field("sourceCounter").ofType(SourceCounter.class).in(source).get();
+ }
+
+
+ private ChannelCounter getMemoryChannelCounterGroup(MemoryChannel source) {
+ return field("channelCounter").ofType(ChannelCounter.class).in(source).get();
+ }
+
+ @Test
+ public void testCarrotDateFormatBug() throws Exception {
+ SimpleDateFormat formatterFrom = new SimpleDateFormat("EEE MMM dd HH:mm:ss Z yyyy");
+ formatterFrom.parse("Fri Oct 26 22:53:55 +0000 2012");
+ }
+
+}
diff --git a/flume-twitter-source/src/test/resources/log4j2.xml b/flume-twitter-source/src/test/resources/log4j2.xml
new file mode 100644
index 0000000..0f3d384
--- /dev/null
+++ b/flume-twitter-source/src/test/resources/log4j2.xml
@@ -0,0 +1,37 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+-->
+<Configuration status="OFF">
+ <Appenders>
+ <Console name="Console" target="SYSTEM_OUT">
+ <PatternLayout pattern="%-4r [%t] %-5p %c %x - %m%n" />
+ </Console>
+ </Appenders>
+ <Loggers>
+ <Logger name="org.apache.flume.sink" level="INFO"/>
+ <Logger name="org.apache.solr" level="INFO"/>
+ <Logger name="org.apache.solr.morphline" level="DEBUG"/>
+ <Logger name="org.apache.solr" level="DEBUG"/>
+ <Logger name="org.apache.solr.update.processor.LogUpdateProcessor" level="WARN"/>
+ <Logger name="org.apache.solr.core.SolrCore" level="WARN"/>
+ <Logger name="org.apache.solr.search.SolrIndexSearcher" level="ERROR"/>
+ <Root level="WARN">
+ <AppenderRef ref="Console" />
+ </Root>
+ </Loggers>
+</Configuration>
\ No newline at end of file
diff --git a/flume-twitter-source/src/test/resources/twitter-flume.conf b/flume-twitter-source/src/test/resources/twitter-flume.conf
new file mode 100644
index 0000000..72fe4ef
--- /dev/null
+++ b/flume-twitter-source/src/test/resources/twitter-flume.conf
@@ -0,0 +1,92 @@
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+# The configuration file needs to define the sources,
+# the channels and the sinks.
+# Sources, channels and sinks are defined per agent,
+# in this case called 'agent'
+
+agent.sources = twitterSrc
+#agent.sources = httpSrc
+#agent.sources = spoolSrc
+#agent.sources = avroSrc
+agent.channels = memoryChannel
+agent.sinks = solrSink
+#agent.sinks = loggerSink
+
+agent.sources.twitterSrc.type = org.apache.flume.source.twitter.TwitterSource
+agent.sources.twitterSrc.consumerKey = YOUR_TWITTER_CONSUMER_KEY
+agent.sources.twitterSrc.consumerSecret = YOUR_TWITTER_CONSUMER_SECRET
+agent.sources.twitterSrc.accessToken = YOUR_TWITTER_ACCESS_TOKEN
+agent.sources.twitterSrc.accessTokenSecret = YOUR_TWITTER_ACCESS_TOKEN_SECRET
+agent.sources.twitterSrc.maxBatchDurationMillis = 200
+agent.sources.twitterSrc.channels = memoryChannel
+
+agent.sources.httpSrc.type = org.apache.flume.source.http.HTTPSource
+agent.sources.httpSrc.port = 5140
+agent.sources.httpSrc.handler = org.apache.flume.sink.solr.morphline.BlobHandler
+agent.sources.httpSrc.handler.maxBlobLength = 2000000000
+#agent.sources.httpSrc.interceptors = uuidinterceptor
+#agent.sources.httpSrc.interceptors.uuidinterceptor.type = org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder
+#agent.sources.httpSrc.interceptors.uuidinterceptor.headerName = id
+##agent.sources.httpSrc.interceptors.uuidinterceptor.preserveExisting = false
+##agent.sources.httpSrc.interceptors.uuidinterceptor.prefix = myhostname
+agent.sources.httpSrc.channels = memoryChannel
+
+agent.sources.spoolSrc.type = spooldir
+agent.sources.spoolSrc.spoolDir = /tmp/myspooldir
+agent.sources.spoolSrc.ignorePattern = \.
+agent.sources.spoolSrc.deserializer = org.apache.flume.sink.solr.morphline.BlobDeserializer$Builder
+agent.sources.spoolSrc.deserializer.maxBlobLength = 2000000000
+agent.sources.spoolSrc.batchSize = 1
+agent.sources.spoolSrc.fileHeader = true
+agent.sources.spoolSrc.fileHeaderKey = resourceName
+#agent.sources.spoolSrc.interceptors = uuidinterceptor
+#agent.sources.spoolSrc.interceptors.uuidinterceptor.type = org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder
+#agent.sources.spoolSrc.interceptors.uuidinterceptor.headerName = id
+##agent.sources.spoolSrc.interceptors.uuidinterceptor.preserveExisting = false
+##agent.sources.spoolSrc.interceptors.uuidinterceptor.prefix = myhostname
+agent.sources.spoolSrc.channels = memoryChannel
+
+agent.sources.avroSrc.type = avro
+agent.sources.avroSrc.bind = 127.0.0.1
+agent.sources.avroSrc.port = 10000
+agent.sources.avroSrc.channels = memoryChannel
+agent.sources.avroSrc.interceptors = uuidinterceptor
+agent.sources.avroSrc.interceptors.uuidinterceptor.type = org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder
+agent.sources.avroSrc.interceptors.uuidinterceptor.headerName = id
+#agent.sources.avroSrc.interceptors.uuidinterceptor.preserveExisting = false
+#agent.sources.avroSrc.interceptors.uuidinterceptor.prefix = myhostname
+#agent.sources.avroSrc.interceptors = morphlineinterceptor
+#agent.sources.avroSrc.interceptors.morphlineinterceptor.type = org.apache.flume.sink.solr.morphline.MorphlineInterceptor$Builder
+#agent.sources.avroSrc.interceptors.morphlineinterceptor.morphlineFile = /etc/flume-ng/conf/morphline.conf
+#agent.sources.avroSrc.interceptors.morphlineinterceptor.morphlineId = morphline1
+
+agent.channels.memoryChannel.type = memory
+agent.channels.memoryChannel.capacity = 10000
+agent.channels.memoryChannel.transactionCapacity = 1000
+
+#agent.channels.fileChannel.type = file
+#agent.channels.fileChannel.capacity = 1000000
+#agent.channels.fileChannel.transactionCapacity = 1000
+#agent.channels.fileChannel.write-timeout = 1
+
+agent.sinks.solrSink.type = org.apache.flume.sink.solr.morphline.MorphlineSolrSink
+agent.sinks.solrSink.channel = memoryChannel
+#agent.sinks.solrSink.batchSize = 1000
+#agent.sinks.solrSink.batchDurationMillis = 1000
+agent.sinks.solrSink.morphlineFile = /etc/flume-ng/conf/morphline.conf
+#agent.sinks.solrSink.morphlineId = morphline1
+
+#agent.sinks.loggerSink.type = logger
+#agent.sinks.loggerSink.channel = memoryChannel
diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..49cbbca
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,286 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache</groupId>
+ <artifactId>apache</artifactId>
+ <version>29</version>
+ </parent>
+
+ <groupId>org.apache.flume</groupId>
+ <artifactId>flume-twitter-parent</artifactId>
+ <name>Flume Twitter Parent</name>
+ <version>2.0.0</version>
+ <packaging>pom</packaging>
+
+ <properties>
+ <ReleaseVersion>2.0.0</ReleaseVersion>
+ <ReleaseManager>Ralph Goers</ReleaseManager>
+ <ReleaseKey>B3D8E1BA</ReleaseKey>
+ <SigningUserName>rgoers@apache.org</SigningUserName>
+ <checksum-maven-plugin.version>1.11</checksum-maven-plugin.version>
+ <fest-reflect.version>1.4</fest-reflect.version>
+ <findsecbugs-plugin.version>1.12.0</findsecbugs-plugin.version>
+ <flume.version>1.11.0</flume.version>
+ <junit.version>4.13.2</junit.version>
+ <log4j.version>2.20.0</log4j.version>
+ <maven.compiler.source>11</maven.compiler.source>
+ <maven.compiler.target>11</maven.compiler.target>
+ <module.name>org.apache.flume.twitter</module.name>
+ <mvn-gpg-plugin.version>1.6</mvn-gpg-plugin.version>
+ <mvn-javadoc-plugin.version>2.9</mvn-javadoc-plugin.version>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ <rat.version>0.12</rat.version>
+ <slf4j.version>1.7.32</slf4j.version>
+ <spotbugs-maven-plugin.version>4.7.2.1</spotbugs-maven-plugin.version>
+ <spotless-maven-plugin.version>2.27.2</spotless-maven-plugin.version>
+ <twitter4j.version>4.0.7</twitter4j.version>
+ <twitter4j-media.version>4.0.6</twitter4j-media.version>
+ </properties>
+ <dependencyManagement>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flume</groupId>
+ <artifactId>flume-twitter-source</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flume</groupId>
+ <artifactId>flume-ng-core</artifactId>
+ <version>${flume.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flume</groupId>
+ <artifactId>flume-ng-node</artifactId>
+ <version>${flume.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-api</artifactId>
+ <version>${log4j.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-core</artifactId>
+ <version>${log4j.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-slf4j-impl</artifactId>
+ <version>${log4j.version}</version>
+ </dependency>
+
+ <!-- Dependencies of the Twitter source -->
+ <dependency>
+ <groupId>org.twitter4j</groupId>
+ <artifactId>twitter4j-core</artifactId>
+ <version>${twitter4j.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.twitter4j</groupId>
+ <artifactId>twitter4j-media-support</artifactId>
+ <version>${twitter4j-media.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.twitter4j</groupId>
+ <artifactId>twitter4j-stream</artifactId>
+ <version>${twitter4j.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.easytesting</groupId>
+ <artifactId>fest-reflect</artifactId>
+ <version>${fest-reflect.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>${junit.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <version>${slf4j.version}</version>
+ </dependency>
+ </dependencies>
+ </dependencyManagement>
+
+ <inceptionYear>2022</inceptionYear>
+
+ <issueManagement>
+ <system>JIRA</system>
+ <url>https://issues.apache.org/jira/browse/FLUME</url>
+ </issueManagement>
+
+ <licenses>
+ <license>
+ <name>The Apache Software License, Version 2.0</name>
+ <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
+ </license>
+ </licenses>
+
+ <mailingLists>
+ <mailingList>
+ <archive>http://mail-archives.apache.org/mod_mbox/flume-user/</archive>
+ <name>Flume User List</name>
+ <post>user@flume.apache.org</post>
+ <subscribe>user-subscribe@flume.apache.org</subscribe>
+ <unsubscribe>user-unsubscribe@flume.apache.org</unsubscribe>
+ </mailingList>
+ <mailingList>
+ <archive>http://mail-archives.apache.org/mod_mbox/flume-dev/</archive>
+ <name>Flume Developer List</name>
+ <post>dev@flume.apache.org</post>
+ <subscribe>dev-subscribe@flume.apache.org</subscribe>
+ <unsubscribe>dev-unsubscribe@flume.apache.org</unsubscribe>
+ </mailingList>
+ <mailingList>
+ <archive>http://mail-archives.apache.org/mod_mbox/flume-commits/</archive>
+ <name>Flume Commits</name>
+ <post>commits@flume.apache.org</post>
+ <subscribe>commits-subscribe@flume.apache.org</subscribe>
+ <unsubscribe>commits-unsubscribe@flume.apache.org</unsubscribe>
+ </mailingList>
+ </mailingLists>
+
+ <scm>
+ <url>https://gitbox.apache.org/repos/asf/flume-spring-boot.git</url>
+ <developerConnection>https://gitbox.apache.org/repos/asf/flume-spring-boot.git</developerConnection>
+ <connection>https://gitbox.apache.org/repos/asf/flume-spring-boot.git</connection>
+ </scm>
+
+ <developers>
+ <developer>
+ <name>Ralph Goers</name>
+ <id>rgoers</id>
+ <email>rgoers@apache.org</email>
+ <organization>Intuit</organization>
+ </developer>
+ </developers>
+
+ <organization>
+ <name>Apache Software Foundation</name>
+ <url>http://www.apache.org</url>
+ </organization>
+ <modules>
+ <module>flume-twitter-source</module>
+ </modules>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.rat</groupId>
+ <artifactId>apache-rat-plugin</artifactId>
+ <version>${rat.version}</version>
+ <configuration>
+ <excludes>
+ <exclude>**/.idea/</exclude>
+ <exclude>**/*.iml</exclude>
+ <exclude>src/main/resources/META-INF/services/**/*</exclude>
+ <exclude>**/nb-configuration.xml</exclude>
+ <exclude>.git/</exclude>
+ <exclude>patchprocess/</exclude>
+ <exclude>.gitignore</exclude>
+ <exclude>**/*.yml</exclude>
+ <exclude>**/*.yaml</exclude>
+ <exclude>**/*.json</exclude>
+ <!-- ASF jenkins box puts the Maven repo in our root directory. -->
+ <exclude>.repository/</exclude>
+ <exclude>**/*.diff</exclude>
+ <exclude>**/*.patch</exclude>
+ <exclude>**/*.avsc</exclude>
+ <exclude>**/*.avro</exclude>
+ <exclude>**/docs/**</exclude>
+ <exclude>**/test/resources/**</exclude>
+ <exclude>**/.settings/*</exclude>
+ <exclude>**/.classpath</exclude>
+ <exclude>**/.project</exclude>
+ <exclude>**/target/**</exclude>
+ <exclude>**/derby.log</exclude>
+ <exclude>**/metastore_db/</exclude>
+ <exclude>.mvn/**</exclude>
+ <exclude>**/exclude-pmd.properties</exclude>
+ </excludes>
+ <consoleOutput>true</consoleOutput>
+ </configuration>
+ <executions>
+ <execution>
+ <id>verify.rat</id>
+ <phase>verify</phase>
+ <goals>
+ <goal>check</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+ <profiles>
+ <profile>
+ <id>release</id>
+ <modules>
+ <module>flume-twitter-dist</module>
+ </modules>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-gpg-plugin</artifactId>
+ <version>${mvn-gpg-plugin.version}</version>
+ <executions>
+ <execution>
+ <phase>verify</phase>
+ <goals>
+ <goal>sign</goal>
+ </goals>
+ <configuration>
+ <keyname>${SigningUserName}</keyname>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-javadoc-plugin</artifactId>
+ <version>${mvn-javadoc-plugin.version}</version>
+ <executions>
+ <execution>
+ <id>javadoc-jar</id>
+ <phase>package</phase>
+ <goals>
+ <goal>aggregate-jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ <configuration>
+ <additionalparam>-Xdoclint:none</additionalparam>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ </profiles>
+
+</project>