You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by hs...@apache.org on 2013/06/18 09:35:25 UTC

[9/9] git commit: FLUME-2070. Add a Flume Morphline Solr Sink.

FLUME-2070. Add a Flume Morphline Solr Sink.

(Wolfgang Hoschek via Hari Shreedharan)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/cf629841
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/cf629841
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/cf629841

Branch: refs/heads/trunk
Commit: cf6298415caca606d44fb8a1833337a23d6088d4
Parents: 296fc9f
Author: Hari Shreedharan <hs...@apache.org>
Authored: Tue Jun 18 00:32:34 2013 -0700
Committer: Hari Shreedharan <hs...@apache.org>
Committed: Tue Jun 18 00:32:34 2013 -0700

----------------------------------------------------------------------
 flume-ng-dist/pom.xml                           |    4 +
 .../flume-ng-morphline-solr-sink/README.md      |   20 +
 .../flume-ng-morphline-solr-sink/pom.xml        |  202 ++
 .../sink/solr/morphline/BlobDeserializer.java   |  161 ++
 .../flume/sink/solr/morphline/BlobHandler.java  |  126 ++
 .../sink/solr/morphline/MorphlineHandler.java   |   62 +
 .../solr/morphline/MorphlineHandlerImpl.java    |  134 ++
 .../solr/morphline/MorphlineInterceptor.java    |  246 +++
 .../sink/solr/morphline/MorphlineSink.java      |  201 ++
 .../sink/solr/morphline/MorphlineSolrSink.java  |   49 +
 .../sink/solr/morphline/UUIDInterceptor.java    |  115 ++
 .../sink/solr/morphline/EmbeddedSource.java     |   45 +
 .../FlumeHttpServletRequestWrapper.java         |  321 +++
 .../ResettableTestStringInputStream.java        |   89 +
 .../solr/morphline/TestBlobDeserializer.java    |  122 ++
 .../sink/solr/morphline/TestBlobHandler.java    |   60 +
 .../sink/solr/morphline/TestEnvironment.java    |   33 +
 .../morphline/TestMorphlineInterceptor.java     |  156 ++
 .../solr/morphline/TestMorphlineSolrSink.java   |  430 ++++
 .../solr/morphline/TestUUIDInterceptor.java     |   63 +
 .../src/test/resources/custom-mimetypes.xml     |   38 +
 .../src/test/resources/log4j.properties         |   34 +
 .../solr/collection1/conf/currency.xml          |   67 +
 .../resources/solr/collection1/conf/elevate.xml |   38 +
 .../collection1/conf/lang/contractions_ca.txt   |    8 +
 .../collection1/conf/lang/contractions_fr.txt   |    9 +
 .../collection1/conf/lang/contractions_ga.txt   |    5 +
 .../collection1/conf/lang/contractions_it.txt   |   23 +
 .../collection1/conf/lang/hyphenations_ga.txt   |    5 +
 .../solr/collection1/conf/lang/stemdict_nl.txt  |    6 +
 .../solr/collection1/conf/lang/stoptags_ja.txt  |  420 ++++
 .../solr/collection1/conf/lang/stopwords_ar.txt |  125 ++
 .../solr/collection1/conf/lang/stopwords_bg.txt |  193 ++
 .../solr/collection1/conf/lang/stopwords_ca.txt |  220 +++
 .../solr/collection1/conf/lang/stopwords_cz.txt |  172 ++
 .../solr/collection1/conf/lang/stopwords_da.txt |  108 ++
 .../solr/collection1/conf/lang/stopwords_de.txt |  292 +++
 .../solr/collection1/conf/lang/stopwords_el.txt |   78 +
 .../solr/collection1/conf/lang/stopwords_en.txt |   54 +
 .../solr/collection1/conf/lang/stopwords_es.txt |  354 ++++
 .../solr/collection1/conf/lang/stopwords_eu.txt |   99 +
 .../solr/collection1/conf/lang/stopwords_fa.txt |  313 +++
 .../solr/collection1/conf/lang/stopwords_fi.txt |   95 +
 .../solr/collection1/conf/lang/stopwords_fr.txt |  183 ++
 .../solr/collection1/conf/lang/stopwords_ga.txt |  110 ++
 .../solr/collection1/conf/lang/stopwords_gl.txt |  161 ++
 .../solr/collection1/conf/lang/stopwords_hi.txt |  235 +++
 .../solr/collection1/conf/lang/stopwords_hu.txt |  209 ++
 .../solr/collection1/conf/lang/stopwords_hy.txt |   46 +
 .../solr/collection1/conf/lang/stopwords_id.txt |  359 ++++
 .../solr/collection1/conf/lang/stopwords_it.txt |  301 +++
 .../solr/collection1/conf/lang/stopwords_ja.txt |  127 ++
 .../solr/collection1/conf/lang/stopwords_lv.txt |  172 ++
 .../solr/collection1/conf/lang/stopwords_nl.txt |  117 ++
 .../solr/collection1/conf/lang/stopwords_no.txt |  192 ++
 .../solr/collection1/conf/lang/stopwords_pt.txt |  251 +++
 .../solr/collection1/conf/lang/stopwords_ro.txt |  233 +++
 .../solr/collection1/conf/lang/stopwords_ru.txt |  241 +++
 .../solr/collection1/conf/lang/stopwords_sv.txt |  131 ++
 .../solr/collection1/conf/lang/stopwords_th.txt |  119 ++
 .../solr/collection1/conf/lang/stopwords_tr.txt |  212 ++
 .../solr/collection1/conf/lang/userdict_ja.txt  |   29 +
 .../solr/collection1/conf/protwords.txt         |   21 +
 .../resources/solr/collection1/conf/schema.xml  |  947 +++++++++
 .../solr/collection1/conf/solrconfig.xml        | 1828 ++++++++++++++++++
 .../solr/collection1/conf/stopwords.txt         |   14 +
 .../solr/collection1/conf/synonyms.txt          |   29 +
 .../resources/test-documents/NullHeader.docx    |  Bin 0 -> 4355 bytes
 .../resources/test-documents/boilerplate.html   |   41 +
 .../src/test/resources/test-documents/cars.csv  |    6 +
 .../test/resources/test-documents/cars.csv.gz   |  Bin 0 -> 167 bytes
 .../src/test/resources/test-documents/cars.ssv  |    6 +
 .../test/resources/test-documents/cars.tar.gz   |  Bin 0 -> 10240 bytes
 .../src/test/resources/test-documents/cars.tsv  |    6 +
 .../test/resources/test-documents/complex.mbox  |  291 +++
 .../test-documents/multiline-sessions.log       |    9 +
 ...multiline-stacktrace-expected-long-event.log |   25 +
 .../test-documents/multiline-stacktrace.log     |   30 +
 ...gth-delimited-20130430-234145-tweets.json.gz |  Bin 0 -> 955 bytes
 .../test/resources/test-documents/rsstest.rss   |   36 +
 .../sample-statuses-20120906-141433             |    4 +
 .../sample-statuses-20120906-141433-medium.avro |  Bin 0 -> 249540 bytes
 ...mple-statuses-20120906-141433-subschema.avsc |   12 +
 .../sample-statuses-20120906-141433.avro        |  Bin 0 -> 1208 bytes
 .../sample-statuses-20120906-141433.avsc        |   57 +
 .../sample-statuses-20120906-141433.bz2         |  Bin 0 -> 1054 bytes
 .../sample-statuses-20120906-141433.gz          |  Bin 0 -> 907 bytes
 .../resources/test-documents/test-documents.7z  |  Bin 0 -> 71931 bytes
 .../test-documents/test-documents.cpio          |  Bin 0 -> 116224 bytes
 .../resources/test-documents/test-documents.tar |  Bin 0 -> 133120 bytes
 .../test-documents/test-documents.tbz2          |  Bin 0 -> 71127 bytes
 .../resources/test-documents/test-documents.tgz |  Bin 0 -> 69060 bytes
 .../resources/test-documents/test-documents.zip |  Bin 0 -> 68403 bytes
 .../resources/test-documents/test-outlook.msg   |  Bin 0 -> 19968 bytes
 .../test-documents/test-zip-of-zip.zip          |  Bin 0 -> 299 bytes
 .../test/resources/test-documents/testAIFF.aif  |  Bin 0 -> 3894 bytes
 .../test/resources/test-documents/testBMP.bmp   |  Bin 0 -> 22554 bytes
 .../test/resources/test-documents/testBMPfp.txt |    3 +
 .../test/resources/test-documents/testDITA.dita |   34 +
 .../test/resources/test-documents/testEMLX.emlx |   55 +
 .../test/resources/test-documents/testEXCEL.xls |  Bin 0 -> 13824 bytes
 .../resources/test-documents/testEXCEL.xlsx     |  Bin 0 -> 9453 bytes
 .../test/resources/test-documents/testFLAC.flac |  Bin 0 -> 10604 bytes
 .../test/resources/test-documents/testFLAC.oga  |  Bin 0 -> 10820 bytes
 .../test/resources/test-documents/testFLV.flv   |  Bin 0 -> 90580 bytes
 .../test/resources/test-documents/testGIF.gif   |  Bin 0 -> 8495 bytes
 .../test/resources/test-documents/testJAR.jar   |  Bin 0 -> 441 bytes
 .../resources/test-documents/testJPEG_EXIF.jpg  |  Bin 0 -> 16357 bytes
 .../test-documents/testJPEG_EXIF.jpg.gz         |  Bin 0 -> 8595 bytes
 .../test-documents/testJPEG_EXIF.jpg.tar.gz     |  Bin 0 -> 8722 bytes
 .../test/resources/test-documents/testKML.kml   |  917 +++++++++
 .../resources/test-documents/testKeynote.key    |  Bin 0 -> 221745 bytes
 .../resources/test-documents/testMP3i18n.mp3    |  Bin 0 -> 40832 bytes
 .../test/resources/test-documents/testMP4.m4a   |  Bin 0 -> 4770 bytes
 .../test-documents/testNumbers.numbers          |  Bin 0 -> 134571 bytes
 .../test/resources/test-documents/testPDF.pdf   |  Bin 0 -> 34824 bytes
 .../test/resources/test-documents/testPNG.png   |  Bin 0 -> 17041 bytes
 .../test/resources/test-documents/testPPM.ppm   |    4 +
 .../test-documents/testPPT_various.ppt          |  Bin 0 -> 164352 bytes
 .../test-documents/testPPT_various.pptx         |  Bin 0 -> 56659 bytes
 .../test/resources/test-documents/testPSD.psd   |  Bin 0 -> 69410 bytes
 .../resources/test-documents/testPages.pages    |  Bin 0 -> 134152 bytes
 .../test/resources/test-documents/testRDF.rdf   |   23 +
 .../test/resources/test-documents/testRFC822    |   41 +
 .../resources/test-documents/testRTFVarious.rtf |  329 ++++
 .../test/resources/test-documents/testSVG.svg   |    7 +
 .../test/resources/test-documents/testTIFF.tif  |  Bin 0 -> 25584 bytes
 .../resources/test-documents/testTrueType.ttf   |  Bin 0 -> 98724 bytes
 .../test/resources/test-documents/testVISIO.vsd |  Bin 0 -> 45568 bytes
 .../resources/test-documents/testVORBIS.ogg     |  Bin 0 -> 4241 bytes
 .../test/resources/test-documents/testWAR.war   |  Bin 0 -> 1003 bytes
 .../test/resources/test-documents/testWAV.wav   |  Bin 0 -> 3884 bytes
 .../resources/test-documents/testWINMAIL.dat    |  Bin 0 -> 66276 bytes
 .../test/resources/test-documents/testWMA.wma   |  Bin 0 -> 27747 bytes
 .../test/resources/test-documents/testWMF.wmf   |  Bin 0 -> 51590 bytes
 .../test/resources/test-documents/testWMV.wmv   |  Bin 0 -> 113878 bytes
 .../test-documents/testWORD_various.doc         |  Bin 0 -> 35328 bytes
 .../test-documents/testWindows-x86-32.exe       |  Bin 0 -> 11723 bytes
 .../test/resources/test-documents/testXML.xml   |   48 +
 .../grokIfNotMatchDropRecord.conf               |   75 +
 .../test-morphlines/ifDetectMimeType.conf       |   74 +
 .../resources/test-morphlines/noOperation.conf  |   27 +
 .../resources/test-morphlines/readClob.conf     |   32 +
 .../test-morphlines/solrCellDocumentTypes.conf  |  260 +++
 flume-ng-sinks/pom.xml                          |    1 +
 pom.xml                                         |    8 +-
 146 files changed, 14122 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/cf629841/flume-ng-dist/pom.xml
----------------------------------------------------------------------
diff --git a/flume-ng-dist/pom.xml b/flume-ng-dist/pom.xml
index 9a5f64e..302c147 100644
--- a/flume-ng-dist/pom.xml
+++ b/flume-ng-dist/pom.xml
@@ -113,6 +113,10 @@
       <artifactId>flume-ng-elasticsearch-sink</artifactId>
     </dependency>
     <dependency>
+      <groupId>org.apache.flume.flume-ng-sinks</groupId>
+      <artifactId>flume-ng-morphline-solr-sink</artifactId>
+    </dependency>
+    <dependency>
       <groupId>org.apache.flume.flume-ng-sources</groupId>
       <artifactId>flume-scribe-source</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/flume/blob/cf629841/flume-ng-sinks/flume-ng-morphline-solr-sink/README.md
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-morphline-solr-sink/README.md b/flume-ng-sinks/flume-ng-morphline-solr-sink/README.md
new file mode 100644
index 0000000..ede3ab7
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-morphline-solr-sink/README.md
@@ -0,0 +1,20 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Flume Morphline Solr Sink
+
+This module contains a Flume Morphline Solr Sink that extracts search documents from Flume events, transforms them and loads them in Near Real Time into Apache Solr, typically a SolrCloud. This sink is intended to be used alongside the HdfsSink. It is designed to process not just structured data, but also arbitrary raw data, including data from many heterogeneous data sources.

http://git-wip-us.apache.org/repos/asf/flume/blob/cf629841/flume-ng-sinks/flume-ng-morphline-solr-sink/pom.xml
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-morphline-solr-sink/pom.xml b/flume-ng-sinks/flume-ng-morphline-solr-sink/pom.xml
new file mode 100644
index 0000000..b42f650
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-morphline-solr-sink/pom.xml
@@ -0,0 +1,202 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <artifactId>flume-ng-sinks</artifactId>
+    <groupId>org.apache.flume</groupId>
+    <version>1.4.0-SNAPSHOT</version>
+  </parent>
+
+  <groupId>org.apache.flume.flume-ng-sinks</groupId>
+  <artifactId>flume-ng-morphline-solr-sink</artifactId>
+  <version>1.4.0-SNAPSHOT</version>
+  <name>Flume NG Morphline Solr Sink</name>
+
+  <properties>
+    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+    <solr.version>4.3.0</solr.version>
+    <solr.expected.version>4.3.0</solr.expected.version> <!-- sanity check to verify we actually run against the expected version rather than some outdated version -->
+    <tika.version>1.3</tika.version>
+    <cdk.version>0.9.1-cdh4.3.0-SNAPSHOT</cdk.version>
+    <slf4j.version>1.6.1</slf4j.version>
+    <surefire.version>2.12.4</surefire.version>
+  </properties>
+
+  <repositories>
+    <repository>
+      <id>cdh.repo</id>
+      <url>https://repository.cloudera.com/artifactory/cloudera-repos</url>
+      <name>Cloudera Repositories</name>
+      <snapshots>
+        <enabled>false</enabled>
+      </snapshots>
+    </repository>
+
+    <repository>
+      <id>cdh.snapshots.repo</id>
+      <url>https://repository.cloudera.com/artifactory/libs-snapshot-local</url>
+      <name>Cloudera Snapshots Repository</name>
+      <snapshots>
+        <enabled>true</enabled>
+      </snapshots>
+      <releases>
+        <enabled>false</enabled>
+      </releases>
+    </repository>
+  </repositories>
+
+  <dependencies>
+
+    <dependency>
+      <groupId>org.apache.flume</groupId>
+      <artifactId>flume-ng-core</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>jcl-over-slf4j</artifactId>
+      <version>${slf4j.version}</version> <!-- flume provides 1.7.2 and solr depends on 1.6.4 -->
+      <scope>provided</scope>
+    </dependency>
+
+    <dependency> <!-- see http://lucene.apache.org/solr -->
+      <groupId>org.apache.solr</groupId>
+      <artifactId>solr-test-framework</artifactId>
+      <version>${solr.version}</version>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>com.cloudera.search</groupId>
+      <artifactId>cdk-morphlines-solr-cell</artifactId>
+      <version>${cdk.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>com.cloudera.search</groupId>
+      <artifactId>cdk-morphlines-avro</artifactId>
+      <version>${cdk.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>com.cloudera.search</groupId>
+      <artifactId>cdk-morphlines-twitter</artifactId>
+      <version>${cdk.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>com.cloudera.search</groupId>
+      <artifactId>cdk-morphlines-tika-core</artifactId>
+      <version>${cdk.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>com.cloudera.search</groupId>
+      <artifactId>cdk-morphlines-tika-decompress</artifactId>
+      <version>${cdk.version}</version>
+    </dependency>
+
+    <dependency> <!-- see http://tika.apache.org -->
+      <groupId>org.apache.tika</groupId>
+      <artifactId>tika-xmp</artifactId>
+      <version>${tika.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>org.apache.geronimo.specs</groupId>
+          <artifactId>geronimo-stax-api_1.0_spec</artifactId> <!-- needed by tika-parsers but already provided by JDK -->
+        </exclusion>
+        <exclusion>
+          <groupId>xerces</groupId>
+          <artifactId>xercesImpl</artifactId> <!-- used by com.drewnoakes:metadata-extractor:jar but replacing built-in XML parser with legacy xerces is scary and probably don't need it -->
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+    <dependency>
+      <groupId>com.cloudera.search</groupId>
+      <artifactId>cdk-morphlines-solr-core</artifactId>
+      <version>${cdk.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency> <!-- see http://lucene.apache.org/solr -->
+      <groupId>org.apache.solr</groupId>
+      <artifactId>solr-core</artifactId>
+      <version>${solr.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-jdk14</artifactId> <!-- instead use slf4j on top of log4j or logback  -->
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+  </dependencies>
+
+  <build>
+    <plugins>
+
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-surefire-plugin</artifactId>
+        <version>${surefire.version}</version>
+        <configuration>
+          <argLine>-Dtests.locale=en_us</argLine>
+          <redirectTestOutputToFile>true</redirectTestOutputToFile>
+          <systemPropertyVariables>
+            <!--<solr.expected.version>${solr.expected.version}</solr.expected.version>-->
+          </systemPropertyVariables>
+        </configuration>
+      </plugin>
+
+      <plugin>
+        <groupId>org.apache.rat</groupId>
+        <artifactId>apache-rat-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>test.rat</id>
+            <phase>test</phase>
+            <goals>
+              <goal>check</goal>
+            </goals>
+            <configuration>
+              <excludes>
+                <exclude>src/test/resources/**</exclude>
+              </excludes>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+
+    </plugins>
+  </build>
+</project>

http://git-wip-us.apache.org/repos/asf/flume/blob/cf629841/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/BlobDeserializer.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/BlobDeserializer.java b/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/BlobDeserializer.java
new file mode 100644
index 0000000..12bdc40
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/BlobDeserializer.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.sink.solr.morphline;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.commons.io.output.ByteArrayOutputStream;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.annotations.InterfaceAudience;
+import org.apache.flume.annotations.InterfaceStability;
+import org.apache.flume.conf.ConfigurationException;
+import org.apache.flume.event.EventBuilder;
+import org.apache.flume.serialization.EventDeserializer;
+import org.apache.flume.serialization.ResettableInputStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+
+/**
+ * A deserializer that reads a Binary Large Object (BLOB) per event, typically
+ * one BLOB per file; To be used in conjunction with Flume SpoolDirectorySource.
+ * <p>
+ * Note that this approach is not suitable for very large objects because it
+ * buffers up the entire BLOB.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class BlobDeserializer implements EventDeserializer {
+
+  private ResettableInputStream in;
+  private final int maxBlobLength;
+  private volatile boolean isOpen;
+
+  public static final String MAX_BLOB_LENGTH_KEY = "maxBlobLength";
+  public static final int MAX_BLOB_LENGTH_DEFAULT = 100 * 1000 * 1000;
+
+  private static final int DEFAULT_BUFFER_SIZE = 1024 * 8;
+  private static final Logger LOGGER = LoggerFactory.getLogger(BlobDeserializer.class);
+      
+  protected BlobDeserializer(Context context, ResettableInputStream in) {
+    this.in = in;
+    this.maxBlobLength = context.getInteger(MAX_BLOB_LENGTH_KEY, MAX_BLOB_LENGTH_DEFAULT);
+    if (this.maxBlobLength <= 0) {
+      throw new ConfigurationException("Configuration parameter " + MAX_BLOB_LENGTH_KEY
+          + " must be greater than zero: " + maxBlobLength);
+    }
+    this.isOpen = true;
+  }
+
+  /**
+   * Reads a BLOB from a file and returns an event
+   * @return Event containing a BLOB
+   * @throws IOException
+   */
+  @SuppressWarnings("resource")
+  @Override
+  public Event readEvent() throws IOException {
+    ensureOpen();
+    ByteArrayOutputStream blob = null;
+    byte[] buf = new byte[Math.min(maxBlobLength, DEFAULT_BUFFER_SIZE)];
+    int blobLength = 0;
+    int n = 0;
+    while ((n = in.read(buf, 0, Math.min(buf.length, maxBlobLength - blobLength))) != -1) {
+      if (blob == null) {
+        blob = new ByteArrayOutputStream(n);
+      }
+      blob.write(buf, 0, n);
+      blobLength += n;
+      if (blobLength >= maxBlobLength) {
+        LOGGER.warn("File length exceeds maxBlobLength ({}), truncating BLOB event!", maxBlobLength);
+        break;
+      }
+    }
+    
+    if (blob == null) {
+      return null;
+    } else {
+      return EventBuilder.withBody(blob.toByteArray());
+    }
+  }
+  
+  /**
+   * Batch BLOB read
+   * @param numEvents Maximum number of events to return.
+   * @return List of events containing read BLOBs
+   * @throws IOException
+   */
+  @Override
+  public List<Event> readEvents(int numEvents) throws IOException {
+    ensureOpen();
+    List<Event> events = Lists.newLinkedList();
+    for (int i = 0; i < numEvents; i++) {
+      Event event = readEvent();
+      if (event != null) {
+        events.add(event);
+      } else {
+        break;
+      }
+    }
+    return events;
+  }
+
+  @Override
+  public void mark() throws IOException {
+    ensureOpen();
+    in.mark();
+  }
+
+  @Override
+  public void reset() throws IOException {
+    ensureOpen();
+    in.reset();
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (isOpen) {
+      reset();
+      in.close();
+      isOpen = false;
+    }
+  }
+
+  private void ensureOpen() {
+    if (!isOpen) {
+      throw new IllegalStateException("Serializer has been closed");
+    }
+  }
+
+  
+  ///////////////////////////////////////////////////////////////////////////////
+  // Nested classes:
+  ///////////////////////////////////////////////////////////////////////////////
+  /** Builder implementations MUST have a public no-arg constructor */
+  public static class Builder implements EventDeserializer.Builder {
+
+    @Override
+    public BlobDeserializer build(Context context, ResettableInputStream in) {      
+      return new BlobDeserializer(context, in);
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/cf629841/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/BlobHandler.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/BlobHandler.java b/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/BlobHandler.java
new file mode 100644
index 0000000..e84dec1
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/BlobHandler.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.sink.solr.morphline;
+
+import java.io.InputStream;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import javax.servlet.http.HttpServletRequest;
+
+import org.apache.commons.io.output.ByteArrayOutputStream;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.conf.ConfigurationException;
+import org.apache.flume.event.EventBuilder;
+import org.apache.flume.source.http.HTTPSourceHandler;
+import org.apache.tika.metadata.Metadata;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * BlobHandler for HTTPSource that returns event that contains the request
+ * parameters as well as the Binary Large Object (BLOB) uploaded with this
+ * request.
+ * <p>
+ * Note that this approach is not suitable for very large objects because it
+ * buffers up the entire BLOB.
+ * <p>
+ * Example client usage:
+ * <pre>
+ * curl --data-binary @sample-statuses-20120906-141433-medium.avro 'http://127.0.0.1:5140?resourceName=sample-statuses-20120906-141433-medium.avro' --header 'Content-Type:application/octet-stream' --verbose
+ * </pre>
+ */
+public class BlobHandler implements HTTPSourceHandler {
+
+  private int maxBlobLength = MAX_BLOB_LENGTH_DEFAULT;
+
+  public static final String MAX_BLOB_LENGTH_KEY = "maxBlobLength";
+  public static final int MAX_BLOB_LENGTH_DEFAULT = 100 * 1000 * 1000;
+
+  private static final int DEFAULT_BUFFER_SIZE = 1024 * 8;
+  private static final Logger LOGGER = LoggerFactory.getLogger(BlobHandler.class);
+
+  public BlobHandler() {
+  }
+
+  @Override
+  public void configure(Context context) {
+    this.maxBlobLength = context.getInteger(MAX_BLOB_LENGTH_KEY, MAX_BLOB_LENGTH_DEFAULT);
+    if (this.maxBlobLength <= 0) {
+      throw new ConfigurationException("Configuration parameter " + MAX_BLOB_LENGTH_KEY
+          + " must be greater than zero: " + maxBlobLength);
+    }
+  }
+  
+  @SuppressWarnings("resource")
+  @Override
+  public List<Event> getEvents(HttpServletRequest request) throws Exception {
+    Map<String, String> headers = getHeaders(request);    
+    InputStream in = request.getInputStream();
+    try {
+      ByteArrayOutputStream blob = null;
+      byte[] buf = new byte[Math.min(maxBlobLength, DEFAULT_BUFFER_SIZE)];
+      int blobLength = 0;
+      int n = 0;
+      while ((n = in.read(buf, 0, Math.min(buf.length, maxBlobLength - blobLength))) != -1) {
+        if (blob == null) {
+          blob = new ByteArrayOutputStream(n);
+        }
+        blob.write(buf, 0, n);
+        blobLength += n;
+        if (blobLength >= maxBlobLength) {
+          LOGGER.warn("Request length exceeds maxBlobLength ({}), truncating BLOB event!", maxBlobLength);
+          break;
+        }
+      }
+
+      byte[] array = blob != null ? blob.toByteArray() : new byte[0];
+      Event event = EventBuilder.withBody(array, headers);
+      LOGGER.debug("blobEvent: {}", event);
+      return Collections.singletonList(event);
+    } finally {
+      in.close();
+    }
+  }
+
+  private Map<String, String> getHeaders(HttpServletRequest request) {
+    if (LOGGER.isDebugEnabled()) {
+      Map requestHeaders = new HashMap();
+      Enumeration iter = request.getHeaderNames();
+      while (iter.hasMoreElements()) {
+        String name = (String) iter.nextElement();
+        requestHeaders.put(name, request.getHeader(name));        
+      }
+      LOGGER.debug("requestHeaders: {}", requestHeaders);
+    }    
+    Map<String, String> headers = new HashMap();
+    if (request.getContentType() != null) {
+      headers.put(Metadata.CONTENT_TYPE, request.getContentType());
+    }
+    Enumeration iter = request.getParameterNames();
+    while (iter.hasMoreElements()) {
+      String name = (String) iter.nextElement();
+      headers.put(name, request.getParameter(name));        
+    }
+    return headers;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/cf629841/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineHandler.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineHandler.java b/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineHandler.java
new file mode 100644
index 0000000..bb5191d
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineHandler.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.sink.solr.morphline;
+
+import java.io.IOException;
+
+import org.apache.flume.Event;
+import org.apache.flume.conf.Configurable;
+
+/**
+ * Interface to load Flume events into Solr
+ */
+public interface MorphlineHandler extends Configurable {
+
+  /** Begins a transaction */
+  public void beginTransaction();
+
+  /** Loads the given event into Solr */
+  public void process(Event event);
+
+  /**
+   * Sends any outstanding documents to Solr and waits for a positive
+   * or negative ack (i.e. exception). Depending on the outcome the caller
+   * should then commit or rollback the current flume transaction
+   * correspondingly.
+   * 
+   * @throws IOException
+   *           If there is a low-level I/O error.
+   */
+  public void commitTransaction();
+
+  /**
+   * Performs a rollback of all non-committed documents pending.
+   * <p>
+   * Note that this is not a true rollback as in databases. Content you have previously added to
+   * Solr may have already been committed due to autoCommit, buffer full, other client performing a
+   * commit etc. So this is only a best-effort rollback.
+   * 
+   * @throws IOException
+   *           If there is a low-level I/O error.
+   */
+  public void rollbackTransaction();
+
+  /** Releases allocated resources */
+  public void stop();
+
+
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/cf629841/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineHandlerImpl.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineHandlerImpl.java b/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineHandlerImpl.java
new file mode 100644
index 0000000..ea76322
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineHandlerImpl.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.sink.solr.morphline;
+
+import java.io.File;
+import java.util.Map.Entry;
+
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.cloudera.cdk.morphline.api.Command;
+import com.cloudera.cdk.morphline.api.MorphlineCompilationException;
+import com.cloudera.cdk.morphline.api.MorphlineContext;
+import com.cloudera.cdk.morphline.api.Record;
+import com.cloudera.cdk.morphline.base.Compiler;
+import com.cloudera.cdk.morphline.base.FaultTolerance;
+import com.cloudera.cdk.morphline.base.Fields;
+import com.cloudera.cdk.morphline.base.Notifications;
+import com.codahale.metrics.MetricRegistry;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+/**
+ * A {@link MorphlineHandler} that processes it's events using a morphline {@link Command} chain.
+ */
+public class MorphlineHandlerImpl implements MorphlineHandler {
+
+  private MorphlineContext morphlineContext;
+  private Command morphline;
+  private Command finalChild;
+  private String morphlineFileAndId;
+  
+  public static final String MORPHLINE_FILE_PARAM = "morphlineFile";
+  public static final String MORPHLINE_ID_PARAM = "morphlineId";
+  
+  /**
+   * Morphline variables can be passed from flume.conf to the morphline, e.g.:
+   * agent.sinks.solrSink.morphlineVariable.zkHost=127.0.0.1:2181/solr
+   */
+  public static final String MORPHLINE_VARIABLE_PARAM = "morphlineVariable";
+
+  private static final Logger LOG = LoggerFactory.getLogger(MorphlineHandlerImpl.class);
+  
+  // For test injection
+  void setMorphlineContext(MorphlineContext morphlineContext) {
+    this.morphlineContext = morphlineContext;
+  }
+
+  // for interceptor
+  void setFinalChild(Command finalChild) {
+    this.finalChild = finalChild;
+  }
+
+  @Override
+  public void configure(Context context) {
+    if (morphlineContext == null) {
+      FaultTolerance faultTolerance = new FaultTolerance(
+          context.getBoolean(FaultTolerance.IS_PRODUCTION_MODE, false), 
+          context.getBoolean(FaultTolerance.IS_IGNORING_RECOVERABLE_EXCEPTIONS, false),
+          context.getString(FaultTolerance.RECOVERABLE_EXCEPTION_CLASSES));
+      
+      morphlineContext = new MorphlineContext.Builder()
+        .setExceptionHandler(faultTolerance)
+        .setMetricRegistry(new MetricRegistry())
+        .build();
+    }
+    
+    String morphlineFile = context.getString(MORPHLINE_FILE_PARAM);
+    String morphlineId = context.getString(MORPHLINE_ID_PARAM);
+    if (morphlineFile == null || morphlineFile.trim().length() == 0) {
+      throw new MorphlineCompilationException("Missing parameter: " + MORPHLINE_FILE_PARAM, null);
+    }
+    Config override = ConfigFactory.parseMap(context.getSubProperties(MORPHLINE_VARIABLE_PARAM + "."));
+    morphline = new Compiler().compile(new File(morphlineFile), morphlineId, morphlineContext, finalChild, override);      
+    morphlineFileAndId = morphlineFile + "@" + morphlineId;
+  }
+
+  @Override
+  public void process(Event event) {
+    Record record = new Record();
+    for (Entry<String, String> entry : event.getHeaders().entrySet()) {
+      record.put(entry.getKey(), entry.getValue());
+    }
+    byte[] bytes = event.getBody();
+    if (bytes != null && bytes.length > 0) {
+      record.put(Fields.ATTACHMENT_BODY, bytes);
+    }    
+    try {
+      Notifications.notifyStartSession(morphline);
+      if (!morphline.process(record)) {
+        LOG.warn("Morphline {} failed to process record: {}", morphlineFileAndId, record);
+      }
+    } catch (RuntimeException t) {
+      morphlineContext.getExceptionHandler().handleException(t, record);
+    }
+  }
+
+  @Override
+  public void beginTransaction() {
+    Notifications.notifyBeginTransaction(morphline);      
+  }
+
+  @Override
+  public void commitTransaction() {
+    Notifications.notifyCommitTransaction(morphline);      
+  }
+
+  @Override
+  public void rollbackTransaction() {
+    Notifications.notifyRollbackTransaction(morphline);            
+  }
+
+  @Override
+  public void stop() {
+    Notifications.notifyShutdown(morphline);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/cf629841/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineInterceptor.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineInterceptor.java b/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineInterceptor.java
new file mode 100644
index 0000000..ac0ccb6
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineInterceptor.java
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.sink.solr.morphline;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.FlumeException;
+import org.apache.flume.event.EventBuilder;
+import org.apache.flume.interceptor.Interceptor;
+
+import com.cloudera.cdk.morphline.api.Command;
+import com.cloudera.cdk.morphline.api.Record;
+import com.cloudera.cdk.morphline.base.Fields;
+import com.google.common.base.Preconditions;
+import com.google.common.io.ByteStreams;
+
+/**
+ * Flume Interceptor that executes a morphline on events that are intercepted.
+ * 
+ * Currently, there is a restriction in that the morphline must not generate more than one output
+ * record for each input event.
+ */
+public class MorphlineInterceptor implements Interceptor {
+
+  private final Context context;
+  private final BlockingQueue<LocalMorphlineInterceptor> pool = new LinkedBlockingQueue();
+  
+  protected MorphlineInterceptor(Context context) {
+    Preconditions.checkNotNull(context);
+    this.context = context;
+    returnToPool(new LocalMorphlineInterceptor(context)); // fail fast on morphline compilation exception
+  }
+
+  @Override
+  public void initialize() {
+  }
+
+  @Override
+  public void close() {
+    List<LocalMorphlineInterceptor> interceptors = new ArrayList();
+    pool.drainTo(interceptors);
+    for (LocalMorphlineInterceptor interceptor : interceptors) {
+      interceptor.close();
+    }
+  }
+
+  @Override
+  public List<Event> intercept(List<Event> events) {
+    LocalMorphlineInterceptor interceptor = borrowFromPool();
+    List<Event> results = interceptor.intercept(events);
+    returnToPool(interceptor);
+    return results;
+  }
+  
+  @Override
+  public Event intercept(Event event) {
+    LocalMorphlineInterceptor interceptor = borrowFromPool();
+    Event result = interceptor.intercept(event);
+    returnToPool(interceptor);
+    return result;
+  }
+
+  private void returnToPool(LocalMorphlineInterceptor interceptor) {
+    try {
+      pool.put(interceptor);
+    } catch (InterruptedException e) {
+      throw new FlumeException(e);
+    }
+  }
+  
+  private LocalMorphlineInterceptor borrowFromPool() {
+    LocalMorphlineInterceptor interceptor = pool.poll();
+    if (interceptor == null) {
+      interceptor = new LocalMorphlineInterceptor(context);
+    }
+    return interceptor;
+  }
+
+  
+  ///////////////////////////////////////////////////////////////////////////////
+  // Nested classes:
+  ///////////////////////////////////////////////////////////////////////////////
+  /** Builder implementations MUST have a public no-arg constructor */
+  public static class Builder implements Interceptor.Builder {
+
+    private Context context;
+
+    public Builder() {
+    }
+
+    @Override
+    public MorphlineInterceptor build() {
+      return new MorphlineInterceptor(context);
+    }
+
+    @Override
+    public void configure(Context context) {
+      this.context = context;
+    }
+
+  }
+
+  
+  ///////////////////////////////////////////////////////////////////////////////
+  // Nested classes:
+  ///////////////////////////////////////////////////////////////////////////////
+  private static final class LocalMorphlineInterceptor implements Interceptor {
+
+    private final MorphlineHandlerImpl morphline;
+    private final Collector collector;
+    
+    protected LocalMorphlineInterceptor(Context context) {
+      this.morphline = new MorphlineHandlerImpl();
+      this.collector = new Collector();
+      this.morphline.setFinalChild(collector);
+      this.morphline.configure(context);
+    }
+
+    @Override
+    public void initialize() {
+    }
+
+    @Override
+    public void close() {
+      morphline.stop();
+    }
+
+    @Override
+    public List<Event> intercept(List<Event> events) {
+      List results = new ArrayList(events.size());
+      for (Event event : events) {
+        event = intercept(event);
+        if (event != null) {
+          results.add(event);
+        }
+      }
+      return results;
+    }
+
+    @Override
+    public Event intercept(Event event) {
+      collector.reset();
+      morphline.process(event);
+      List<Record> results = collector.getRecords();
+      if (results.size() == 0) {
+        return null;
+      }
+      if (results.size() > 1) {
+        throw new FlumeException(getClass().getName() + 
+            " must not generate more than one output record per input event");
+      }
+      Event result = toEvent(results.get(0));    
+      return result;
+    }
+    
+    private Event toEvent(Record record) {
+      Map<String, String> headers = new HashMap();
+      Map<String, Collection<Object>> recordMap = record.getFields().asMap();
+      byte[] body = null;
+      for (Map.Entry<String, Collection<Object>> entry : recordMap.entrySet()) {
+        if (entry.getValue().size() > 1) {
+          throw new FlumeException(getClass().getName()
+              + " must not generate more than one output value per record field");
+        }
+        assert entry.getValue().size() != 0; // guava guarantees that
+        Object firstValue = entry.getValue().iterator().next();
+        if (Fields.ATTACHMENT_BODY.equals(entry.getKey())) {
+          if (firstValue instanceof byte[]) {
+            body = (byte[]) firstValue;
+          } else if (firstValue instanceof InputStream) {
+            try {
+              body = ByteStreams.toByteArray((InputStream) firstValue);
+            } catch (IOException e) {
+              throw new FlumeException(e);
+            }            
+          } else {
+            throw new FlumeException(getClass().getName()
+                + " must non generate attachments that are not a byte[] or InputStream");
+          }
+        } else {
+          headers.put(entry.getKey(), firstValue.toString());
+        }
+      }
+      return EventBuilder.withBody(body, headers);
+    }
+  }
+  
+  
+  ///////////////////////////////////////////////////////////////////////////////
+  // Nested classes:
+  ///////////////////////////////////////////////////////////////////////////////
+  private static final class Collector implements Command {
+    
+    private final List<Record> results = new ArrayList();
+    
+    public List<Record> getRecords() {
+      return results;
+    }
+    
+    public void reset() {
+      results.clear();
+    }
+
+    @Override
+    public Command getParent() {
+      return null;
+    }
+    
+    @Override
+    public void notify(Record notification) {
+    }
+
+    @Override
+    public boolean process(Record record) {
+      Preconditions.checkNotNull(record);
+      results.add(record);
+      return true;
+    }
+    
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/cf629841/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineSink.java b/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineSink.java
new file mode 100644
index 0000000..ea637c4
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineSink.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.sink.solr.morphline;
+
+import org.apache.flume.Channel;
+import org.apache.flume.ChannelException;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.Transaction;
+import org.apache.flume.conf.Configurable;
+import org.apache.flume.conf.ConfigurationException;
+import org.apache.flume.instrumentation.SinkCounter;
+import org.apache.flume.sink.AbstractSink;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.cloudera.cdk.morphline.api.Command;
+
+/**
+ * Flume sink that extracts search documents from Flume events and processes them using a morphline
+ * {@link Command} chain.
+ */
+public class MorphlineSink extends AbstractSink implements Configurable {
+
+  private int maxBatchSize = 100;
+  private long maxBatchDurationMillis = 1000;
+  private String handlerClass;
+  private MorphlineHandler handler;
+  private Context context;
+  private SinkCounter sinkCounter;
+
+  public static final String BATCH_SIZE = "batchSize";
+  public static final String BATCH_DURATION_MILLIS = "batchDurationMillis";
+  public static final String HANDLER_CLASS = "handlerClass";
+  
+  private static final Logger LOGGER = LoggerFactory.getLogger(MorphlineSink.class);
+
+  public MorphlineSink() {
+    this(null);
+  }
+
+  /** For testing only */
+  protected MorphlineSink(MorphlineHandler handler) {
+    this.handler = handler;
+  }
+
+  @Override
+  public void configure(Context context) {
+    this.context = context;
+    maxBatchSize = context.getInteger(BATCH_SIZE, maxBatchSize);
+    maxBatchDurationMillis = context.getLong(BATCH_DURATION_MILLIS, maxBatchDurationMillis);
+    handlerClass = context.getString(HANDLER_CLASS, MorphlineHandlerImpl.class.getName());    
+    if (sinkCounter == null) {
+      sinkCounter = new SinkCounter(getName());
+    }
+  }
+
+  /**
+   * Returns the maximum number of events to take per flume transaction;
+   * override to customize
+   */
+  private int getMaxBatchSize() {
+    return maxBatchSize;
+  }
+
+  /** Returns the maximum duration per flume transaction; override to customize */
+  private long getMaxBatchDurationMillis() {
+    return maxBatchDurationMillis;
+  }
+
+  @Override
+  public synchronized void start() {
+    LOGGER.info("Starting Morphline Sink {} ...", this);
+    sinkCounter.start();
+    if (handler == null) {
+      MorphlineHandler tmpHandler;
+      try {
+        tmpHandler = (MorphlineHandler) Class.forName(handlerClass).newInstance();
+      } catch (Exception e) {
+        throw new ConfigurationException(e);
+      }
+      tmpHandler.configure(context);
+      handler = tmpHandler;
+    }    
+    super.start();
+    LOGGER.info("Morphline Sink {} started.", getName());
+  }
+
+  @Override
+  public synchronized void stop() {
+    LOGGER.info("Morphline Sink {} stopping...", getName());
+    try {
+      if (handler != null) {
+        handler.stop();
+      }
+      sinkCounter.stop();
+      LOGGER.info("Morphline Sink {} stopped. Metrics: {}, {}", getName(), sinkCounter);
+    } finally {
+      super.stop();
+    }
+  }
+
+  @Override
+  public Status process() throws EventDeliveryException {
+    int batchSize = getMaxBatchSize();
+    long batchEndTime = System.currentTimeMillis() + getMaxBatchDurationMillis();
+    Channel myChannel = getChannel();
+    Transaction txn = myChannel.getTransaction();
+    txn.begin();
+    boolean isMorphlineTransactionCommitted = true;
+    try {
+      int numEventsTaken = 0;
+      handler.beginTransaction();
+      isMorphlineTransactionCommitted = false;
+
+      // repeatedly take and process events from the Flume queue
+      for (int i = 0; i < batchSize; i++) {
+        Event event = myChannel.take();
+        if (event == null) {
+          break;
+        }
+        numEventsTaken++;
+        LOGGER.debug("Flume event: {}", event);      
+        //StreamEvent streamEvent = createStreamEvent(event);
+        handler.process(event);
+        if (System.currentTimeMillis() >= batchEndTime) {
+          break;
+        }
+      }
+
+      // update metrics
+      if (numEventsTaken == 0) {
+        sinkCounter.incrementBatchEmptyCount();
+      }
+      if (numEventsTaken < batchSize) {
+        sinkCounter.incrementBatchUnderflowCount();
+      } else {
+        sinkCounter.incrementBatchCompleteCount();
+      }
+      sinkCounter.addToEventDrainAttemptCount(numEventsTaken);
+      sinkCounter.addToEventDrainSuccessCount(numEventsTaken);
+
+      handler.commitTransaction();
+      isMorphlineTransactionCommitted = true;
+      txn.commit();
+      return numEventsTaken == 0 ? Status.BACKOFF : Status.READY;
+    } catch (Throwable t) {
+      // Ooops - need to rollback and back off
+      LOGGER.error("Morphline Sink " + getName() + ": Unable to process event from channel " + myChannel.getName()
+            + ". Exception follows.", t);
+      try {
+        if (!isMorphlineTransactionCommitted) {
+          handler.rollbackTransaction();
+        }
+      } catch (Throwable t2) {
+        LOGGER.error("Morphline Sink " + getName() + ": Unable to rollback morphline transaction. " +
+        		"Exception follows.", t2);
+      } finally {
+        try {
+          txn.rollback();
+        } catch (Throwable t4) {
+          LOGGER.error("Morphline Sink " + getName() + ": Unable to rollback Flume transaction. " +
+              "Exception follows.", t4);
+        }
+      }
+
+      if (t instanceof Error) {
+        throw (Error) t; // rethrow original exception
+      } else if (t instanceof ChannelException) {
+        return Status.BACKOFF;
+      } else {
+        throw new EventDeliveryException("Failed to send events", t); // rethrow and backoff
+      }
+    } finally {
+      txn.close();
+    }
+  }
+  
+  @Override
+  public String toString() {
+    int i = getClass().getName().lastIndexOf('.') + 1;
+    String shortClassName = getClass().getName().substring(i);
+    return getName() + " (" + shortClassName + ")";
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/cf629841/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineSolrSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineSolrSink.java b/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineSolrSink.java
new file mode 100644
index 0000000..bc07a2e
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineSolrSink.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.sink.solr.morphline;
+
+import org.apache.flume.Context;
+
+import com.cloudera.cdk.morphline.api.Command;
+import com.cloudera.cdk.morphline.base.FaultTolerance;
+
+
+/**
+ * Flume sink that extracts search documents from Flume events, processes them using a morphline
+ * {@link Command} chain, and loads them into Apache Solr.
+ */
+public class MorphlineSolrSink extends MorphlineSink {
+
+  public MorphlineSolrSink() {
+    super();
+  }
+  
+  /** For testing only */
+  protected MorphlineSolrSink(MorphlineHandler handler) {
+    super(handler);
+  }
+
+  @Override
+  public void configure(Context context) {
+    if (context.getString(FaultTolerance.RECOVERABLE_EXCEPTION_CLASSES) == null) {
+      context.put(FaultTolerance.RECOVERABLE_EXCEPTION_CLASSES, 
+          "org.apache.solr.client.solrj.SolrServerException");      
+    }
+    super.configure(context);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/cf629841/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/UUIDInterceptor.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/UUIDInterceptor.java b/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/UUIDInterceptor.java
new file mode 100644
index 0000000..22d5347
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/UUIDInterceptor.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.sink.solr.morphline;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.interceptor.Interceptor;
+
+/**
+ * Flume Interceptor that sets a universally unique identifier on all events
+ * that are intercepted. By default this event header is named "id".
+ */
+public class UUIDInterceptor implements Interceptor {
+
+  private String headerName;
+  private boolean preserveExisting;
+  private String prefix;
+
+  public static final String HEADER_NAME = "headerName";
+  public static final String PRESERVE_EXISTING_NAME = "preserveExisting";
+  public static final String PREFIX_NAME = "prefix";
+
+  protected UUIDInterceptor(Context context) {
+    headerName = context.getString(HEADER_NAME, "id");
+    preserveExisting = context.getBoolean(PRESERVE_EXISTING_NAME, true);
+    prefix = context.getString(PREFIX_NAME, "");
+  }
+
+  @Override
+  public void initialize() {
+  }
+
+  protected String getPrefix() {
+    return prefix;
+  }
+
+  protected String generateUUID() {
+    return getPrefix() + UUID.randomUUID().toString();
+  }
+
+  protected boolean isMatch(Event event) {
+    return true;
+  }
+
+  @Override
+  public Event intercept(Event event) {
+    Map<String, String> headers = event.getHeaders();
+    if (preserveExisting && headers.containsKey(headerName)) {
+      // we must preserve the existing id
+    } else if (isMatch(event)) {
+      headers.put(headerName, generateUUID());
+    }
+    return event;
+  }
+
+  @Override
+  public List<Event> intercept(List<Event> events) {
+    List results = new ArrayList(events.size());
+    for (Event event : events) {
+      event = intercept(event);
+      if (event != null) {
+        results.add(event);
+      }
+    }
+    return results;
+  }
+
+  @Override
+  public void close() {
+  }
+
+  
+  ///////////////////////////////////////////////////////////////////////////////
+  // Nested classes:
+  ///////////////////////////////////////////////////////////////////////////////
+  /** Builder implementations MUST have a public no-arg constructor */
+  public static class Builder implements Interceptor.Builder {
+
+    private Context context;
+
+    public Builder() {
+    }
+
+    @Override
+    public UUIDInterceptor build() {
+      return new UUIDInterceptor(context);
+    }
+
+    @Override
+    public void configure(Context context) {
+      this.context = context;
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/cf629841/flume-ng-sinks/flume-ng-morphline-solr-sink/src/test/java/org/apache/flume/sink/solr/morphline/EmbeddedSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-morphline-solr-sink/src/test/java/org/apache/flume/sink/solr/morphline/EmbeddedSource.java b/flume-ng-sinks/flume-ng-morphline-solr-sink/src/test/java/org/apache/flume/sink/solr/morphline/EmbeddedSource.java
new file mode 100644
index 0000000..b30fece
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-morphline-solr-sink/src/test/java/org/apache/flume/sink/solr/morphline/EmbeddedSource.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.sink.solr.morphline;
+
+import java.util.List;
+
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.EventDrivenSource;
+import org.apache.flume.Sink;
+import org.apache.flume.source.AbstractSource;
+
+class EmbeddedSource extends AbstractSource implements EventDrivenSource {
+
+  private Sink sink;
+
+  public EmbeddedSource(Sink sink) {
+    this.sink = sink;
+  }
+
+  public void load(Event event) throws EventDeliveryException {
+    getChannelProcessor().processEvent(event);
+    sink.process();
+  }
+
+  public void load(List<Event> events) throws EventDeliveryException {
+    getChannelProcessor().processEventBatch(events);
+    sink.process();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/cf629841/flume-ng-sinks/flume-ng-morphline-solr-sink/src/test/java/org/apache/flume/sink/solr/morphline/FlumeHttpServletRequestWrapper.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-morphline-solr-sink/src/test/java/org/apache/flume/sink/solr/morphline/FlumeHttpServletRequestWrapper.java b/flume-ng-sinks/flume-ng-morphline-solr-sink/src/test/java/org/apache/flume/sink/solr/morphline/FlumeHttpServletRequestWrapper.java
new file mode 100644
index 0000000..9711a3a
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-morphline-solr-sink/src/test/java/org/apache/flume/sink/solr/morphline/FlumeHttpServletRequestWrapper.java
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.sink.solr.morphline;
+
+import java.io.BufferedReader;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.UnsupportedEncodingException;
+import java.security.Principal;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.Locale;
+import java.util.Map;
+
+import javax.servlet.RequestDispatcher;
+import javax.servlet.ServletInputStream;
+import javax.servlet.http.Cookie;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpSession;
+
+class FlumeHttpServletRequestWrapper implements HttpServletRequest {
+
+  private ServletInputStream stream;
+  private String charset;
+  
+  public FlumeHttpServletRequestWrapper(final byte[] data) {
+    stream = new ServletInputStream() {
+      private final InputStream in = new ByteArrayInputStream(data);      
+      @Override
+      public int read() throws IOException {
+        return in.read();
+      }
+    };
+  }
+
+  @Override
+  public String getAuthType() {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
+
+  @Override
+  public Cookie[] getCookies() {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
+
+  @Override
+  public long getDateHeader(String name) {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
+
+  @Override
+  public String getHeader(String name) {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
+
+  @Override
+  public Enumeration getHeaders(String name) {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
+
+  @Override
+  public Enumeration getHeaderNames() {
+    return Collections.enumeration(Collections.EMPTY_LIST);
+  }
+
+  @Override
+  public int getIntHeader(String name) {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
+
+  @Override
+  public String getMethod() {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
+
+  @Override
+  public String getPathInfo() {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
+
+  @Override
+  public String getPathTranslated() {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
+
+  @Override
+  public String getContextPath() {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
+
+  @Override
+  public String getQueryString() {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
+
+  @Override
+  public String getRemoteUser() {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
+
+  @Override
+  public boolean isUserInRole(String role) {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
+
+  @Override
+  public Principal getUserPrincipal() {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
+
+  @Override
+  public String getRequestedSessionId() {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
+
+  @Override
+  public String getRequestURI() {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
+
+  @Override
+  public StringBuffer getRequestURL() {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
+
+  @Override
+  public String getServletPath() {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
+
+  @Override
+  public HttpSession getSession(boolean create) {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
+
+  @Override
+  public HttpSession getSession() {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
+
+  @Override
+  public boolean isRequestedSessionIdValid() {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
+
+  @Override
+  public boolean isRequestedSessionIdFromCookie() {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
+
+  @Override
+  public boolean isRequestedSessionIdFromURL() {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
+
+  @Override
+  public boolean isRequestedSessionIdFromUrl() {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
+
+  @Override
+  public Object getAttribute(String name) {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
+
+  @Override
+  public Enumeration getAttributeNames() {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
+
+  @Override
+  public String getCharacterEncoding() {
+    return charset;
+  }
+
+  @Override
+  public void setCharacterEncoding(String env) throws UnsupportedEncodingException {
+    this.charset = env;
+  }
+
+  @Override
+  public int getContentLength() {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
+
+  @Override
+  public String getContentType() {
+    return null;
+  }
+
+  @Override
+  public ServletInputStream getInputStream() throws IOException {
+    return stream;
+  }
+
+  @Override
+  public String getParameter(String name) {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
+
+  @Override
+  public Enumeration getParameterNames() {
+    return Collections.enumeration(Collections.EMPTY_LIST);
+  }
+
+  @Override
+  public String[] getParameterValues(String name) {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
+
+  @Override
+  public Map getParameterMap() {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
+
+  @Override
+  public String getProtocol() {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
+
+  @Override
+  public String getScheme() {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
+
+  @Override
+  public String getServerName() {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
+
+  @Override
+  public int getServerPort() {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
+
+  @Override
+  public BufferedReader getReader() throws IOException {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
+
+  @Override
+  public String getRemoteAddr() {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
+
+  @Override
+  public String getRemoteHost() {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
+
+  @Override
+  public void setAttribute(String name, Object o) {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
+
+  @Override
+  public void removeAttribute(String name) {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
+
+  @Override
+  public Locale getLocale() {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
+
+  @Override
+  public Enumeration getLocales() {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
+
+  @Override
+  public boolean isSecure() {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
+
+  @Override
+  public RequestDispatcher getRequestDispatcher(String path) {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
+
+  @Override
+  public String getRealPath(String path) {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
+
+  @Override
+  public int getRemotePort() {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
+
+  @Override
+  public String getLocalName() {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
+
+  @Override
+  public String getLocalAddr() {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
+
+  @Override
+  public int getLocalPort() {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/cf629841/flume-ng-sinks/flume-ng-morphline-solr-sink/src/test/java/org/apache/flume/sink/solr/morphline/ResettableTestStringInputStream.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-morphline-solr-sink/src/test/java/org/apache/flume/sink/solr/morphline/ResettableTestStringInputStream.java b/flume-ng-sinks/flume-ng-morphline-solr-sink/src/test/java/org/apache/flume/sink/solr/morphline/ResettableTestStringInputStream.java
new file mode 100644
index 0000000..e6ee9b9
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-morphline-solr-sink/src/test/java/org/apache/flume/sink/solr/morphline/ResettableTestStringInputStream.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.sink.solr.morphline;
+
+import java.io.IOException;
+
+import org.apache.flume.serialization.ResettableInputStream;
+
+class ResettableTestStringInputStream extends ResettableInputStream {
+
+  private String str;
+  private int markPos = 0;
+  private int curPos = 0;
+
+  /**
+   * Warning: This test class does not handle character/byte conversion at all!
+   * @param str String to use for testing
+   */
+  public ResettableTestStringInputStream(String str) {
+    this.str = str;
+  }
+
+  @Override
+  public int readChar() throws IOException {
+    throw new UnsupportedOperationException("This test class doesn't return " +
+        "strings!");
+  }
+
+  @Override
+  public void mark() throws IOException {
+    markPos = curPos;
+  }
+
+  @Override
+  public void reset() throws IOException {
+    curPos = markPos;
+  }
+
+  @Override
+  public void seek(long position) throws IOException {
+    throw new UnsupportedOperationException("Unimplemented in test class");
+  }
+
+  @Override
+  public long tell() throws IOException {
+    throw new UnsupportedOperationException("Unimplemented in test class");
+  }
+
+  @Override
+  public int read() throws IOException {
+    if (curPos >= str.length()) {
+      return -1;
+    }
+    return str.charAt(curPos++);
+  }
+
+  @Override
+  public int read(byte[] b, int off, int len) throws IOException {
+    if (curPos >= str.length()) {
+      return -1;
+    }
+    int n = 0;
+    while (len > 0 && curPos < str.length()) {
+      b[off++] = (byte) str.charAt(curPos++);
+      n++;
+      len--;
+    }
+    return n;
+  }
+
+  @Override
+  public void close() throws IOException {
+    // no-op
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/cf629841/flume-ng-sinks/flume-ng-morphline-solr-sink/src/test/java/org/apache/flume/sink/solr/morphline/TestBlobDeserializer.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-morphline-solr-sink/src/test/java/org/apache/flume/sink/solr/morphline/TestBlobDeserializer.java b/flume-ng-sinks/flume-ng-morphline-solr-sink/src/test/java/org/apache/flume/sink/solr/morphline/TestBlobDeserializer.java
new file mode 100644
index 0000000..6172c68
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-morphline-solr-sink/src/test/java/org/apache/flume/sink/solr/morphline/TestBlobDeserializer.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.sink.solr.morphline;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.serialization.EventDeserializer;
+import org.apache.flume.serialization.EventDeserializerFactory;
+import org.apache.flume.serialization.ResettableInputStream;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.base.Charsets;
+
+public class TestBlobDeserializer extends Assert {
+
+  private String mini;
+
+  @Before
+  public void setup() {
+    StringBuilder sb = new StringBuilder();
+    sb.append("line 1\n");
+    sb.append("line 2\n");
+    mini = sb.toString();
+  }
+
+  @Test
+  public void testSimple() throws IOException {
+    ResettableInputStream in = new ResettableTestStringInputStream(mini);
+    EventDeserializer des = new BlobDeserializer(new Context(), in);
+    validateMiniParse(des);
+  }
+
+  @Test
+  public void testSimpleViaBuilder() throws IOException {
+    ResettableInputStream in = new ResettableTestStringInputStream(mini);
+    EventDeserializer.Builder builder = new BlobDeserializer.Builder();
+    EventDeserializer des = builder.build(new Context(), in);
+    validateMiniParse(des);
+  }
+
+  @Test
+  public void testSimpleViaFactory() throws IOException {
+    ResettableInputStream in = new ResettableTestStringInputStream(mini);
+    EventDeserializer des;
+    des = EventDeserializerFactory.getInstance(BlobDeserializer.Builder.class.getName(), new Context(), in);
+    validateMiniParse(des);
+  }
+
+  @Test
+  public void testBatch() throws IOException {
+    ResettableInputStream in = new ResettableTestStringInputStream(mini);
+    EventDeserializer des = new BlobDeserializer(new Context(), in);
+    List<Event> events;
+
+    events = des.readEvents(10); // try to read more than we should have
+    assertEquals(1, events.size());
+    assertEventBodyEquals(mini, events.get(0));
+
+    des.mark();
+    des.close();
+  }
+
+  // truncation occurs at maxLineLength boundaries
+  @Test
+  public void testMaxLineLength() throws IOException {
+    String longLine = "abcdefghijklmnopqrstuvwxyz\n";
+    Context ctx = new Context();
+    ctx.put(BlobDeserializer.MAX_BLOB_LENGTH_KEY, "10");
+
+    ResettableInputStream in = new ResettableTestStringInputStream(longLine);
+    EventDeserializer des = new BlobDeserializer(ctx, in);
+
+    assertEventBodyEquals("abcdefghij", des.readEvent());
+    assertEventBodyEquals("klmnopqrst", des.readEvent());
+    assertEventBodyEquals("uvwxyz\n", des.readEvent());
+    assertNull(des.readEvent());
+  }
+
+  private void assertEventBodyEquals(String expected, Event event) {
+    String bodyStr = new String(event.getBody(), Charsets.UTF_8);
+    assertEquals(expected, bodyStr);
+  }
+
+  private void validateMiniParse(EventDeserializer des) throws IOException {
+    Event evt;
+
+    des.mark();
+    evt = des.readEvent();
+    assertEquals(new String(evt.getBody()), mini);
+    des.reset(); // reset!
+
+    evt = des.readEvent();
+    assertEquals("data should be repeated, " +
+        "because we reset() the stream", new String(evt.getBody()), mini);
+
+    evt = des.readEvent();
+    assertNull("Event should be null because there are no lines " +
+        "left to read", evt);
+
+    des.mark();
+    des.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/cf629841/flume-ng-sinks/flume-ng-morphline-solr-sink/src/test/java/org/apache/flume/sink/solr/morphline/TestBlobHandler.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-morphline-solr-sink/src/test/java/org/apache/flume/sink/solr/morphline/TestBlobHandler.java b/flume-ng-sinks/flume-ng-morphline-solr-sink/src/test/java/org/apache/flume/sink/solr/morphline/TestBlobHandler.java
new file mode 100644
index 0000000..3e7de99
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-morphline-solr-sink/src/test/java/org/apache/flume/sink/solr/morphline/TestBlobHandler.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.sink.solr.morphline;
+
+import java.util.List;
+
+import javax.servlet.http.HttpServletRequest;
+
+import org.apache.flume.Event;
+import org.apache.flume.source.http.HTTPSourceHandler;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestBlobHandler extends Assert {
+
+  private HTTPSourceHandler handler;
+
+  @Before
+  public void setUp() {
+    handler = new BlobHandler();
+  }
+
+  @Test
+  public void testSingleEvent() throws Exception {
+    byte[] json = "foo".getBytes("UTF-8");
+    HttpServletRequest req = new FlumeHttpServletRequestWrapper(json);
+    List<Event> deserialized = handler.getEvents(req);
+    assertEquals(1,  deserialized.size());
+    Event e = deserialized.get(0);
+    assertEquals(0, e.getHeaders().size());
+    assertEquals("foo", new String(e.getBody(),"UTF-8"));
+  }
+
+  @Test
+  public void testEmptyEvent() throws Exception {
+    byte[] json = "".getBytes("UTF-8");
+    HttpServletRequest req = new FlumeHttpServletRequestWrapper(json);
+    List<Event> deserialized = handler.getEvents(req);
+    assertEquals(1,  deserialized.size());
+    Event e = deserialized.get(0);
+    assertEquals(0, e.getHeaders().size());
+    assertEquals("", new String(e.getBody(),"UTF-8"));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/cf629841/flume-ng-sinks/flume-ng-morphline-solr-sink/src/test/java/org/apache/flume/sink/solr/morphline/TestEnvironment.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-morphline-solr-sink/src/test/java/org/apache/flume/sink/solr/morphline/TestEnvironment.java b/flume-ng-sinks/flume-ng-morphline-solr-sink/src/test/java/org/apache/flume/sink/solr/morphline/TestEnvironment.java
new file mode 100644
index 0000000..5576d1d
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-morphline-solr-sink/src/test/java/org/apache/flume/sink/solr/morphline/TestEnvironment.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.sink.solr.morphline;
+
+import java.net.UnknownHostException;
+
+import org.junit.Test;
+
+import com.cloudera.cdk.morphline.solr.EnvironmentTest;
+
+/** Print and verify some info about the environment in which the unit tests are running */
+public class TestEnvironment extends EnvironmentTest {
+
+  @Test
+  public void testEnvironment() throws UnknownHostException {
+    super.testEnvironment();
+  }
+  
+}