You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by hs...@apache.org on 2015/01/29 02:42:58 UTC

[1/2] flume git commit: FLUME-2591. DatasetSink 2.0

Repository: flume
Updated Branches:
  refs/heads/trunk 0d6eccad2 -> 1d49ef704


http://git-wip-us.apache.org/repos/asf/flume/blob/1d49ef70/flume-ng-sinks/flume-dataset-sink/src/test/java/org/apache/flume/sink/kite/TestDatasetSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-dataset-sink/src/test/java/org/apache/flume/sink/kite/TestDatasetSink.java b/flume-ng-sinks/flume-dataset-sink/src/test/java/org/apache/flume/sink/kite/TestDatasetSink.java
index c46d66c..58aa467 100644
--- a/flume-ng-sinks/flume-dataset-sink/src/test/java/org/apache/flume/sink/kite/TestDatasetSink.java
+++ b/flume-ng-sinks/flume-dataset-sink/src/test/java/org/apache/flume/sink/kite/TestDatasetSink.java
@@ -18,6 +18,8 @@
 
 package org.apache.flume.sink.kite;
 
+import org.apache.flume.sink.kite.parser.EntityParser;
+import org.apache.flume.sink.kite.policy.FailurePolicy;
 import com.google.common.base.Function;
 import com.google.common.base.Throwables;
 import com.google.common.collect.Iterables;
@@ -29,6 +31,7 @@ import java.io.File;
 import java.io.FileWriter;
 import java.io.IOException;
 import java.net.URI;
+import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
@@ -37,12 +40,14 @@ import java.util.Set;
 import java.util.concurrent.Callable;
 import javax.annotation.Nullable;
 import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.GenericRecordBuilder;
 import org.apache.avro.io.Encoder;
 import org.apache.avro.io.EncoderFactory;
 import org.apache.avro.reflect.ReflectDatumWriter;
+import org.apache.avro.util.Utf8;
 import org.apache.commons.io.FileUtils;
 import org.apache.flume.Channel;
 import org.apache.flume.Context;
@@ -52,6 +57,7 @@ import org.apache.flume.Transaction;
 import org.apache.flume.channel.MemoryChannel;
 import org.apache.flume.conf.Configurables;
 import org.apache.flume.event.SimpleEvent;
+import org.apache.flume.source.avro.AvroFlumeEvent;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
@@ -64,9 +70,11 @@ import org.junit.Test;
 import org.kitesdk.data.Dataset;
 import org.kitesdk.data.DatasetDescriptor;
 import org.kitesdk.data.DatasetReader;
+import org.kitesdk.data.DatasetWriter;
 import org.kitesdk.data.Datasets;
 import org.kitesdk.data.PartitionStrategy;
 import org.kitesdk.data.View;
+import static org.mockito.Mockito.*;
 
 public class TestDatasetSink {
 
@@ -74,6 +82,8 @@ public class TestDatasetSink {
   public static final String DATASET_NAME = "test";
   public static final String FILE_DATASET_URI =
       "dataset:file:target/test-repo/" + DATASET_NAME;
+  public static final String ERROR_DATASET_URI =
+      "dataset:file:target/test-repo/failed-events";
   public static final File SCHEMA_FILE = new File("target/record-schema.avsc");
   public static final Schema RECORD_SCHEMA = new Schema.Parser().parse(
       "{\"type\":\"record\",\"name\":\"rec\",\"fields\":[" +
@@ -127,6 +137,7 @@ public class TestDatasetSink {
     Datasets.create(FILE_DATASET_URI, DESCRIPTOR);
 
     this.config = new Context();
+    config.put("keep-alive", "0");
     this.in = new MemoryChannel();
     Configurables.configure(in, config);
 
@@ -195,7 +206,7 @@ public class TestDatasetSink {
   }
 
   @Test
-  public void testFileStore() throws EventDeliveryException {
+  public void testFileStore() throws EventDeliveryException, NonRecoverableEventException, NonRecoverableEventException {
     DatasetSink sink = sink(in, config);
 
     // run the sink
@@ -222,6 +233,19 @@ public class TestDatasetSink {
     // run the sink
     sink.start();
     sink.process();
+
+    // the transaction should not commit during the call to process
+    assertThrows("Transaction should still be open", IllegalStateException.class,
+        new Callable() {
+          @Override
+          public Object call() throws EventDeliveryException {
+            in.getTransaction().begin();
+            return null;
+          }
+        });
+    // The records won't commit until the call to stop()
+    Assert.assertEquals("Should not have committed", 0, read(created).size());
+
     sink.stop();
 
     Assert.assertEquals(Sets.newHashSet(expected), read(created));
@@ -509,6 +533,376 @@ public class TestDatasetSink {
         expected.size() + 1, remaining(in));
   }
 
+  @Test
+  public void testFileStoreWithSavePolicy() throws EventDeliveryException {
+    if (Datasets.exists(ERROR_DATASET_URI)) {
+      Datasets.delete(ERROR_DATASET_URI);
+    }
+    config.put(DatasetSinkConstants.CONFIG_FAILURE_POLICY,
+        DatasetSinkConstants.SAVE_FAILURE_POLICY);
+    config.put(DatasetSinkConstants.CONFIG_KITE_ERROR_DATASET_URI,
+        ERROR_DATASET_URI);
+    DatasetSink sink = sink(in, config);
+
+    // run the sink
+    sink.start();
+    sink.process();
+    sink.stop();
+
+    Assert.assertEquals(
+        Sets.newHashSet(expected),
+        read(Datasets.load(FILE_DATASET_URI)));
+    Assert.assertEquals("Should have committed", 0, remaining(in));
+  }
+
+  @Test
+  public void testMissingSchemaWithSavePolicy() throws EventDeliveryException {
+    if (Datasets.exists(ERROR_DATASET_URI)) {
+      Datasets.delete(ERROR_DATASET_URI);
+    }
+    config.put(DatasetSinkConstants.CONFIG_FAILURE_POLICY,
+        DatasetSinkConstants.SAVE_FAILURE_POLICY);
+    config.put(DatasetSinkConstants.CONFIG_KITE_ERROR_DATASET_URI,
+        ERROR_DATASET_URI);
+    final DatasetSink sink = sink(in, config);
+
+    Event badEvent = new SimpleEvent();
+    badEvent.setHeaders(Maps.<String, String>newHashMap());
+    badEvent.setBody(serialize(expected.get(0), RECORD_SCHEMA));
+    putToChannel(in, badEvent);
+
+    // run the sink
+    sink.start();
+    sink.process();
+    sink.stop();
+
+    Assert.assertEquals("Good records should have been written",
+        Sets.newHashSet(expected),
+        read(Datasets.load(FILE_DATASET_URI)));
+    Assert.assertEquals("Should not have rolled back", 0, remaining(in));
+    Assert.assertEquals("Should have saved the bad event",
+        Sets.newHashSet(AvroFlumeEvent.newBuilder()
+          .setBody(ByteBuffer.wrap(badEvent.getBody()))
+          .setHeaders(toUtf8Map(badEvent.getHeaders()))
+          .build()),
+        read(Datasets.load(ERROR_DATASET_URI, AvroFlumeEvent.class)));
+  }
+
+  @Test
+  public void testSerializedWithIncompatibleSchemasWithSavePolicy()
+      throws EventDeliveryException {
+    if (Datasets.exists(ERROR_DATASET_URI)) {
+      Datasets.delete(ERROR_DATASET_URI);
+    }
+    config.put(DatasetSinkConstants.CONFIG_FAILURE_POLICY,
+        DatasetSinkConstants.SAVE_FAILURE_POLICY);
+    config.put(DatasetSinkConstants.CONFIG_KITE_ERROR_DATASET_URI,
+        ERROR_DATASET_URI);
+    final DatasetSink sink = sink(in, config);
+
+    GenericRecordBuilder builder = new GenericRecordBuilder(
+        INCOMPATIBLE_SCHEMA);
+    GenericData.Record rec = builder.set("username", "koala").build();
+
+    // We pass in a valid schema in the header, but an incompatible schema
+    // was used to serialize the record
+    Event badEvent = event(rec, INCOMPATIBLE_SCHEMA, SCHEMA_FILE, true);
+    putToChannel(in, badEvent);
+
+    // run the sink
+    sink.start();
+    sink.process();
+    sink.stop();
+
+    Assert.assertEquals("Good records should have been written",
+        Sets.newHashSet(expected),
+        read(Datasets.load(FILE_DATASET_URI)));
+    Assert.assertEquals("Should not have rolled back", 0, remaining(in));
+    Assert.assertEquals("Should have saved the bad event",
+        Sets.newHashSet(AvroFlumeEvent.newBuilder()
+          .setBody(ByteBuffer.wrap(badEvent.getBody()))
+          .setHeaders(toUtf8Map(badEvent.getHeaders()))
+          .build()),
+        read(Datasets.load(ERROR_DATASET_URI, AvroFlumeEvent.class)));
+  }
+
+  @Test
+  public void testSerializedWithIncompatibleSchemas() throws EventDeliveryException {
+    final DatasetSink sink = sink(in, config);
+
+    GenericRecordBuilder builder = new GenericRecordBuilder(
+        INCOMPATIBLE_SCHEMA);
+    GenericData.Record rec = builder.set("username", "koala").build();
+
+    // We pass in a valid schema in the header, but an incompatible schema
+    // was used to serialize the record
+    putToChannel(in, event(rec, INCOMPATIBLE_SCHEMA, SCHEMA_FILE, true));
+
+    // run the sink
+    sink.start();
+    assertThrows("Should fail", EventDeliveryException.class,
+        new Callable() {
+          @Override
+          public Object call() throws EventDeliveryException {
+            sink.process();
+            return null;
+          }
+        });
+    sink.stop();
+
+    Assert.assertEquals("Should have rolled back",
+        expected.size() + 1, remaining(in));
+  }
+
+  @Test
+  public void testCommitOnBatch() throws EventDeliveryException {
+    DatasetSink sink = sink(in, config);
+
+    // run the sink
+    sink.start();
+    sink.process();
+
+    // the transaction should commit during the call to process
+    Assert.assertEquals("Should have committed", 0, remaining(in));
+    // but the data won't be visible yet
+    Assert.assertEquals(0,
+        read(Datasets.load(FILE_DATASET_URI)).size());
+
+    sink.stop();
+
+    Assert.assertEquals(
+        Sets.newHashSet(expected),
+        read(Datasets.load(FILE_DATASET_URI)));
+  }
+
+  @Test
+  public void testCommitOnBatchFalse() throws EventDeliveryException {
+    config.put(DatasetSinkConstants.CONFIG_FLUSHABLE_COMMIT_ON_BATCH,
+        Boolean.toString(false));
+    config.put(DatasetSinkConstants.CONFIG_SYNCABLE_SYNC_ON_BATCH,
+        Boolean.toString(false));
+    DatasetSink sink = sink(in, config);
+
+    // run the sink
+    sink.start();
+    sink.process();
+
+    // the transaction should not commit during the call to process
+    assertThrows("Transaction should still be open", IllegalStateException.class,
+        new Callable() {
+          @Override
+          public Object call() throws EventDeliveryException {
+            in.getTransaction().begin();
+            return null;
+          }
+        });
+
+    // the data won't be visible
+    Assert.assertEquals(0,
+        read(Datasets.load(FILE_DATASET_URI)).size());
+
+    sink.stop();
+
+    Assert.assertEquals(
+        Sets.newHashSet(expected),
+        read(Datasets.load(FILE_DATASET_URI)));
+    // the transaction should commit during the call to stop
+    Assert.assertEquals("Should have committed", 0, remaining(in));
+  }
+
+  @Test
+  public void testCommitOnBatchFalseSyncOnBatchTrue() throws EventDeliveryException {
+    config.put(DatasetSinkConstants.CONFIG_FLUSHABLE_COMMIT_ON_BATCH,
+        Boolean.toString(false));
+    config.put(DatasetSinkConstants.CONFIG_SYNCABLE_SYNC_ON_BATCH,
+        Boolean.toString(true));
+
+    try {
+      sink(in, config);
+      Assert.fail("Should have thrown IllegalArgumentException");
+    } catch (IllegalArgumentException ex) {
+      // expected
+    }
+  }
+
+  @Test
+  public void testCloseAndCreateWriter() throws EventDeliveryException {
+    config.put(DatasetSinkConstants.CONFIG_FLUSHABLE_COMMIT_ON_BATCH,
+        Boolean.toString(false));
+    config.put(DatasetSinkConstants.CONFIG_SYNCABLE_SYNC_ON_BATCH,
+        Boolean.toString(false));
+    DatasetSink sink = sink(in, config);
+
+    // run the sink
+    sink.start();
+    sink.process();
+
+    sink.closeWriter();
+    sink.commitTransaction();
+    sink.createWriter();
+
+    Assert.assertNotNull("Writer should not be null", sink.getWriter());
+    Assert.assertEquals("Should have committed", 0, remaining(in));
+
+    sink.stop();
+
+    Assert.assertEquals(
+        Sets.newHashSet(expected),
+        read(Datasets.load(FILE_DATASET_URI)));
+  }
+
+  @Test
+  public void testCloseWriter() throws EventDeliveryException {
+    config.put(DatasetSinkConstants.CONFIG_FLUSHABLE_COMMIT_ON_BATCH,
+        Boolean.toString(false));
+    config.put(DatasetSinkConstants.CONFIG_SYNCABLE_SYNC_ON_BATCH,
+        Boolean.toString(false));
+    DatasetSink sink = sink(in, config);
+
+    // run the sink
+    sink.start();
+    sink.process();
+
+    sink.closeWriter();
+    sink.commitTransaction();
+
+    Assert.assertNull("Writer should be null", sink.getWriter());
+    Assert.assertEquals("Should have committed", 0, remaining(in));
+
+    sink.stop();
+
+    Assert.assertEquals(
+        Sets.newHashSet(expected),
+        read(Datasets.load(FILE_DATASET_URI)));
+  }
+
+  @Test
+  public void testCreateWriter() throws EventDeliveryException {
+    config.put(DatasetSinkConstants.CONFIG_FLUSHABLE_COMMIT_ON_BATCH,
+        Boolean.toString(false));
+    config.put(DatasetSinkConstants.CONFIG_SYNCABLE_SYNC_ON_BATCH,
+        Boolean.toString(false));
+    DatasetSink sink = sink(in, config);
+
+    // run the sink
+    sink.start();
+    sink.process();
+
+    sink.commitTransaction();
+    sink.createWriter();
+    Assert.assertNotNull("Writer should not be null", sink.getWriter());
+    Assert.assertEquals("Should have committed", 0, remaining(in));
+
+    sink.stop();
+
+    Assert.assertEquals(0, read(Datasets.load(FILE_DATASET_URI)).size());
+  }
+
+  @Test
+  public void testAppendWriteExceptionInvokesPolicy()
+      throws EventDeliveryException, NonRecoverableEventException {
+    DatasetSink sink = sink(in, config);
+
+    // run the sink
+    sink.start();
+    sink.process();
+
+    // Mock an Event
+    Event mockEvent = mock(Event.class);
+    when(mockEvent.getBody()).thenReturn(new byte[] { 0x01 });
+
+    // Mock a GenericRecord
+    GenericRecord mockRecord = mock(GenericRecord.class);
+
+    // Mock an EntityParser
+    EntityParser<GenericRecord> mockParser = mock(EntityParser.class);
+    when(mockParser.parse(eq(mockEvent), any(GenericRecord.class)))
+        .thenReturn(mockRecord);
+    sink.setParser(mockParser);
+
+    // Mock a FailurePolicy
+    FailurePolicy mockFailurePolicy = mock(FailurePolicy.class);
+    sink.setFailurePolicy(mockFailurePolicy);
+
+    // Mock a DatasetWriter
+    DatasetWriter<GenericRecord> mockWriter = mock(DatasetWriter.class);
+    doThrow(new DataFileWriter.AppendWriteException(new IOException()))
+        .when(mockWriter).write(mockRecord);
+
+    sink.setWriter(mockWriter);
+    sink.write(mockEvent);
+
+    // Verify that the event was sent to the failure policy
+    verify(mockFailurePolicy).handle(eq(mockEvent), any(Throwable.class));
+
+    sink.stop();
+  }
+
+  @Test
+  public void testRuntimeExceptionThrowsEventDeliveryException()
+      throws EventDeliveryException, NonRecoverableEventException {
+    DatasetSink sink = sink(in, config);
+
+    // run the sink
+    sink.start();
+    sink.process();
+
+    // Mock an Event
+    Event mockEvent = mock(Event.class);
+    when(mockEvent.getBody()).thenReturn(new byte[] { 0x01 });
+
+    // Mock a GenericRecord
+    GenericRecord mockRecord = mock(GenericRecord.class);
+
+    // Mock an EntityParser
+    EntityParser<GenericRecord> mockParser = mock(EntityParser.class);
+    when(mockParser.parse(eq(mockEvent), any(GenericRecord.class)))
+        .thenReturn(mockRecord);
+    sink.setParser(mockParser);
+
+    // Mock a FailurePolicy
+    FailurePolicy mockFailurePolicy = mock(FailurePolicy.class);
+    sink.setFailurePolicy(mockFailurePolicy);
+
+    // Mock a DatasetWriter
+    DatasetWriter<GenericRecord> mockWriter = mock(DatasetWriter.class);
+    doThrow(new RuntimeException()).when(mockWriter).write(mockRecord);
+
+    sink.setWriter(mockWriter);
+
+    try {
+      sink.write(mockEvent);
+      Assert.fail("Should throw EventDeliveryException");
+    } catch (EventDeliveryException ex) {
+
+    }
+
+    // Verify that the event was not sent to the failure policy
+    verify(mockFailurePolicy, never()).handle(eq(mockEvent), any(Throwable.class));
+
+    sink.stop();
+  }
+
+  @Test
+  public void testProcessHandlesNullWriter() throws EventDeliveryException,
+      NonRecoverableEventException, NonRecoverableEventException {
+    DatasetSink sink = sink(in, config);
+
+    // run the sink
+    sink.start();
+    sink.process();
+
+    // explicitly set the writer to null
+    sink.setWriter(null);
+
+    // this should not throw an NPE
+    sink.process();
+
+    sink.stop();
+
+    Assert.assertEquals("Should have committed", 0, remaining(in));
+  }
+
   public static DatasetSink sink(Channel in, Context config) {
     DatasetSink sink = new DatasetSink();
     sink.setChannel(in);
@@ -621,4 +1015,19 @@ public class TestDatasetSink {
       Assert.assertEquals(message, expected, actual.getClass());
     }
   }
+
+  /**
+   * Helper function to convert a map of String to a map of Utf8.
+   *
+   * @param map A Map of String to String
+   * @return The same mappings converting the {@code String}s to {@link Utf8}s
+   */
+  public static Map<CharSequence, CharSequence> toUtf8Map(
+      Map<String, String> map) {
+    Map<CharSequence, CharSequence> utf8Map = Maps.newHashMap();
+    for (Map.Entry<String, String> entry : map.entrySet()) {
+      utf8Map.put(new Utf8(entry.getKey()), new Utf8(entry.getValue()));
+    }
+    return utf8Map;
+  }
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/1d49ef70/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 7d4f01a..1350fa4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -51,7 +51,7 @@ limitations under the License.
 
     <hadoop2.version>2.4.0</hadoop2.version>
     <thrift.version>0.7.0</thrift.version>
-    <kite.version>0.15.0</kite.version>
+    <kite.version>0.17.1</kite.version>
     <hive.version>0.10.0</hive.version>
   </properties>
 


[2/2] flume git commit: FLUME-2591. DatasetSink 2.0

Posted by hs...@apache.org.
FLUME-2591. DatasetSink 2.0

(Joey Echeverria via Hari)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/1d49ef70
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/1d49ef70
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/1d49ef70

Branch: refs/heads/trunk
Commit: 1d49ef704a8bb08280b4e653e6db94dc3d2c2475
Parents: 0d6ecca
Author: Hari Shreedharan <hs...@apache.org>
Authored: Wed Jan 28 17:41:51 2015 -0800
Committer: Hari Shreedharan <hs...@apache.org>
Committed: Wed Jan 28 17:42:46 2015 -0800

----------------------------------------------------------------------
 flume-ng-doc/sphinx/FlumeUserGuide.rst          |  62 +-
 .../org/apache/flume/sink/kite/DatasetSink.java | 650 ++++++++++++-------
 .../flume/sink/kite/DatasetSinkConstants.java   |  74 ++-
 .../sink/kite/NonRecoverableEventException.java |  54 ++
 .../flume/sink/kite/parser/AvroParser.java      | 208 ++++++
 .../flume/sink/kite/parser/EntityParser.java    |  56 ++
 .../sink/kite/parser/EntityParserFactory.java   |  82 +++
 .../flume/sink/kite/policy/FailurePolicy.java   | 105 +++
 .../sink/kite/policy/FailurePolicyFactory.java  |  82 +++
 .../flume/sink/kite/policy/RetryPolicy.java     |  63 ++
 .../flume/sink/kite/policy/SavePolicy.java      | 125 ++++
 .../apache/flume/sink/kite/TestDatasetSink.java | 411 +++++++++++-
 pom.xml                                         |   2 +-
 13 files changed, 1730 insertions(+), 244 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/1d49ef70/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index bcadc2d..7a1dfce 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -2158,23 +2158,51 @@ Note 2: In some cases, file rolling may occur slightly after the roll interval
 has been exceeded. However, this delay will not exceed 5 seconds. In most
 cases, the delay is neglegible.
 
-=======================  =======  ===========================================================
-Property Name            Default  Description
-=======================  =======  ===========================================================
-**channel**              --
-**type**                 --       Must be org.apache.flume.sink.kite.DatasetSink
-**kite.dataset.uri**     --       URI of the dataset to open
-kite.repo.uri            --       URI of the repository to open
-                                  (deprecated; use kite.dataset.uri instead)
-kite.dataset.name        --       Name of the Dataset where records will be written
-                                  (deprecated; use kite.dataset.uri instead)
-kite.batchSize           100      Number of records to process in each batch
-kite.rollInterval        30       Maximum wait time (seconds) before data files are released
-auth.kerberosPrincipal   --       Kerberos user principal for secure authentication to HDFS
-auth.kerberosKeytab      --       Kerberos keytab location (local FS) for the principal
-auth.proxyUser           --       The effective user for HDFS actions, if different from
-                                  the kerberos principal
-=======================  =======  ===========================================================
+============================  =======  ===========================================================
+Property Name                 Default  Description
+============================  =======  ===========================================================
+**channel**                   --
+**type**                      --       Must be org.apache.flume.sink.kite.DatasetSink
+**kite.dataset.uri**          --       URI of the dataset to open
+kite.repo.uri                 --       URI of the repository to open
+                                       (deprecated; use kite.dataset.uri instead)
+kite.dataset.namespace        --       Namespace of the Dataset where records will be written
+                                       (deprecated; use kite.dataset.uri instead)
+kite.dataset.name             --       Name of the Dataset where records will be written
+                                       (deprecated; use kite.dataset.uri instead)
+kite.batchSize                100      Number of records to process in each batch
+kite.rollInterval             30       Maximum wait time (seconds) before data files are released
+kite.flushable.commitOnBatch  true     If ``true``, the Flume transaction will be commited and the
+                                       writer will be flushed on each batch of ``kite.batchSize``
+                                       records. This setting only applies to flushable datasets. When
+                                       ``true``, it's possible for temp files with commited data to be
+                                       left in the dataset directory. These files need to be recovered
+                                       by hand for the data to be visible to DatasetReaders.
+kite.syncable.syncOnBatch     true     Controls whether the sink will also sync data when committing
+                                       the transaction. This setting only applies to syncable datasets.
+                                       Syncing gaurentees that data will be written on stable storage
+                                       on the remote system while flushing only gaurentees that data
+                                       has left Flume's client buffers. When the
+                                       ``kite.flushable.commitOnBatch`` property is set to ``false``,
+                                       this property must also be set to ``false``.
+kite.entityParser             avro     Parser that turns Flume ``Events`` into Kite entities.
+                                       Valid values are ``avro`` and the fully-qualified class name
+                                       of an implementation of the ``EntityParser.Builder`` interface.
+kite.failurePolicy            retry    Policy that handles non-recoverable errors such as a missing
+                                       ``Schema`` in the ``Event`` header. The default value, ``retry``,
+                                       will fail the current batch and try again which matches the old
+                                       behavior. Other valid values are ``save``, which will write the
+                                       raw ``Event`` to the ``kite.error.dataset.uri`` dataset, and the
+                                       fully-qualified class name of an implementation of the
+                                       ``FailurePolicy.Builder`` interface.
+kite.error.dataset.uri        --       URI of the dataset where failed events are saved when
+                                       ``kite.failurePolicy`` is set to ``save``. **Required** when
+                                       the ``kite.failurePolicy`` is set to ``save``.
+auth.kerberosPrincipal        --       Kerberos user principal for secure authentication to HDFS
+auth.kerberosKeytab           --       Kerberos keytab location (local FS) for the principal
+auth.proxyUser                --       The effective user for HDFS actions, if different from
+                                       the kerberos principal
+============================  =======  ===========================================================
 
 
 Kafka Sink

http://git-wip-us.apache.org/repos/asf/flume/blob/1d49ef70/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSink.java b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSink.java
index ebcc617..3e66532 100644
--- a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSink.java
+++ b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSink.java
@@ -15,31 +15,23 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.flume.sink.kite;
 
+import org.apache.flume.sink.kite.parser.EntityParserFactory;
+import org.apache.flume.sink.kite.parser.EntityParser;
+import org.apache.flume.sink.kite.policy.FailurePolicy;
+import org.apache.flume.sink.kite.policy.FailurePolicyFactory;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Throwables;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
 import com.google.common.collect.Lists;
-import java.io.IOException;
-import java.io.InputStream;
 import java.net.URI;
-import java.net.URL;
 import java.security.PrivilegedExceptionAction;
 import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.io.BinaryDecoder;
-import org.apache.avro.io.DatumReader;
-import org.apache.avro.io.DecoderFactory;
 import org.apache.flume.Channel;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
@@ -48,144 +40,180 @@ import org.apache.flume.Transaction;
 import org.apache.flume.conf.Configurable;
 import org.apache.flume.instrumentation.SinkCounter;
 import org.apache.flume.sink.AbstractSink;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.kitesdk.data.Dataset;
 import org.kitesdk.data.DatasetDescriptor;
-import org.kitesdk.data.DatasetException;
+import org.kitesdk.data.DatasetIOException;
+import org.kitesdk.data.DatasetNotFoundException;
 import org.kitesdk.data.DatasetWriter;
+import org.kitesdk.data.DatasetWriterException;
 import org.kitesdk.data.Datasets;
 import org.kitesdk.data.View;
 import org.kitesdk.data.spi.Registration;
-import org.kitesdk.data.spi.URIBuilder;
+import org.kitesdk.data.URIBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.flume.sink.kite.DatasetSinkConstants.*;
+import org.kitesdk.data.Format;
+import org.kitesdk.data.Formats;
+
 /**
- * Experimental sink that writes events to a Kite Dataset. This sink will
- * deserialize the body of each incoming event and store the resulting record
- * in a Kite Dataset. It determines target Dataset by opening a repository URI,
- * {@code kite.repo.uri}, and loading a Dataset by name,
- * {@code kite.dataset.name}.
+ * Sink that writes events to a Kite Dataset. This sink will parse the body of
+ * each incoming event and store the resulting entity in a Kite Dataset. It
+ * determines the destination Dataset by opening a dataset URI
+ * {@code kite.dataset.uri} or opening a repository URI, {@code kite.repo.uri},
+ * and loading a Dataset by name, {@code kite.dataset.name}, and namespace,
+ * {@code kite.dataset.namespace}.
  */
 public class DatasetSink extends AbstractSink implements Configurable {
 
   private static final Logger LOG = LoggerFactory.getLogger(DatasetSink.class);
 
-  static Configuration conf = new Configuration();
+  private Context context = null;
+  private UserGroupInformation login = null;
 
   private String datasetName = null;
-  private long batchSize = DatasetSinkConstants.DEFAULT_BATCH_SIZE;
-
-  private URI target = null;
-  private Schema targetSchema = null;
+  private URI datasetUri = null;
+  private Schema datasetSchema = null;
   private DatasetWriter<GenericRecord> writer = null;
-  private UserGroupInformation login = null;
-  private SinkCounter counter = null;
 
-  // for rolling files at a given interval
-  private int rollIntervalS = DatasetSinkConstants.DEFAULT_ROLL_INTERVAL;
-  private long lastRolledMs = 0l;
+  /**
+   * The number of events to process as a single batch.
+   */
+  private long batchSize = DEFAULT_BATCH_SIZE;
+
+  /**
+   * The number of seconds to wait before rolling a writer.
+   */
+  private int rollIntervalSeconds = DEFAULT_ROLL_INTERVAL;
+
+  /**
+   * Flag that says if Flume should commit on every batch.
+   */
+  private boolean commitOnBatch = DEFAULT_FLUSHABLE_COMMIT_ON_BATCH;
+
+  /**
+   * Flag that says if Flume should sync on every batch.
+   */
+  private boolean syncOnBatch = DEFAULT_SYNCABLE_SYNC_ON_BATCH;
+
+  /**
+   * The last time the writer rolled.
+   */
+  private long lastRolledMillis = 0l;
 
-  // for working with avro serialized records
-  private GenericRecord datum = null;
+  /**
+   * The raw number of bytes parsed.
+   */
+  private long bytesParsed = 0l;
+
+  /**
+   * A class for parsing Kite entities from Flume Events.
+   */
+  private EntityParser<GenericRecord> parser = null;
+
+  /**
+   * A class implementing a failure newPolicy for events that had a
+ non-recoverable error during processing.
+   */
+  private FailurePolicy failurePolicy = null;
+
+  private SinkCounter counter = null;
+
+  /**
+   * The Kite entity
+   */
+  private GenericRecord entity = null;
   // TODO: remove this after PARQUET-62 is released
-  private boolean reuseDatum = true;
-  private BinaryDecoder decoder = null;
-  private LoadingCache<Schema, DatumReader<GenericRecord>> readers =
-      CacheBuilder.newBuilder()
-      .build(new CacheLoader<Schema, DatumReader<GenericRecord>>() {
-        @Override
-        public DatumReader<GenericRecord> load(Schema schema) {
-          // must use the target dataset's schema for reading to ensure the
-          // records are able to be stored using it
-          return new GenericDatumReader<GenericRecord>(
-            schema, targetSchema);
-        }
-      });
-  private static LoadingCache<String, Schema> schemasFromLiteral = CacheBuilder
-      .newBuilder()
-      .build(new CacheLoader<String, Schema>() {
-        @Override
-        public Schema load(String literal) {
-          Preconditions.checkNotNull(literal,
-              "Schema literal cannot be null without a Schema URL");
-          return new Schema.Parser().parse(literal);
-        }
-      });
-  private static LoadingCache<String, Schema> schemasFromURL = CacheBuilder
-      .newBuilder()
-      .build(new CacheLoader<String, Schema>() {
-        @Override
-        public Schema load(String url) throws IOException {
-          Schema.Parser parser = new Schema.Parser();
-          InputStream is = null;
-          try {
-            FileSystem fs = FileSystem.get(URI.create(url), conf);
-            if (url.toLowerCase(Locale.ENGLISH).startsWith("hdfs:/")) {
-              is = fs.open(new Path(url));
-            } else {
-              is = new URL(url).openStream();
-            }
-            return parser.parse(is);
-          } finally {
-            if (is != null) {
-              is.close();
-            }
-          }
-        }
-      });
+  private boolean reuseEntity = true;
+
+  /**
+   * The Flume transaction. Used to keep transactions open across calls to
+   * process.
+   */
+  private Transaction transaction = null;
+
+  /**
+   * Internal flag on if there has been a batch of records committed. This is
+   * used during rollback to know if the current writer needs to be closed.
+   */
+  private boolean committedBatch = false;
 
+  // Factories
+  private static final EntityParserFactory ENTITY_PARSER_FACTORY =
+      new EntityParserFactory();
+  private static final FailurePolicyFactory FAILURE_POLICY_FACTORY =
+      new FailurePolicyFactory();
+
+  /**
+   * Return the list of allowed formats.
+   * @return The list of allowed formats.
+   */
   protected List<String> allowedFormats() {
     return Lists.newArrayList("avro", "parquet");
   }
 
   @Override
   public void configure(Context context) {
+    this.context = context;
+
     // initialize login credentials
     this.login = KerberosUtil.login(
-        context.getString(DatasetSinkConstants.AUTH_PRINCIPAL),
-        context.getString(DatasetSinkConstants.AUTH_KEYTAB));
-    String effectiveUser =
-        context.getString(DatasetSinkConstants.AUTH_PROXY_USER);
+        context.getString(AUTH_PRINCIPAL),
+        context.getString(AUTH_KEYTAB));
+    String effectiveUser
+        = context.getString(AUTH_PROXY_USER);
     if (effectiveUser != null) {
       this.login = KerberosUtil.proxyAs(effectiveUser, login);
     }
 
-    String datasetURI = context.getString(
-      DatasetSinkConstants.CONFIG_KITE_DATASET_URI);
+    // Get the dataset URI and name from the context
+    String datasetURI = context.getString(CONFIG_KITE_DATASET_URI);
     if (datasetURI != null) {
-      this.target = URI.create(datasetURI);
-      this.datasetName = uriToName(target);
+      this.datasetUri = URI.create(datasetURI);
+      this.datasetName = uriToName(datasetUri);
     } else {
-      String repositoryURI = context.getString(
-        DatasetSinkConstants.CONFIG_KITE_REPO_URI);
-      Preconditions.checkNotNull(repositoryURI, "Repository URI is missing");
-      this.datasetName = context.getString(
-        DatasetSinkConstants.CONFIG_KITE_DATASET_NAME);
-      Preconditions.checkNotNull(datasetName, "Dataset name is missing");
-
-      this.target = new URIBuilder(repositoryURI, datasetName).build();
+      String repositoryURI = context.getString(CONFIG_KITE_REPO_URI);
+      Preconditions.checkNotNull(repositoryURI, "No dataset configured. Setting "
+          + CONFIG_KITE_DATASET_URI + " is required.");
+
+      this.datasetName = context.getString(CONFIG_KITE_DATASET_NAME);
+      Preconditions.checkNotNull(datasetName, "No dataset configured. Setting "
+          + CONFIG_KITE_DATASET_URI + " is required.");
+
+      String namespace = context.getString(CONFIG_KITE_DATASET_NAMESPACE,
+          DEFAULT_NAMESPACE);
+
+      this.datasetUri = new URIBuilder(repositoryURI, namespace, datasetName)
+          .build();
+    }
+    this.setName(datasetUri.toString());
+
+    if (context.getBoolean(CONFIG_SYNCABLE_SYNC_ON_BATCH,
+        DEFAULT_SYNCABLE_SYNC_ON_BATCH)) {
+      Preconditions.checkArgument(
+          context.getBoolean(CONFIG_FLUSHABLE_COMMIT_ON_BATCH,
+              DEFAULT_FLUSHABLE_COMMIT_ON_BATCH), "Configuration error: "
+                  + CONFIG_FLUSHABLE_COMMIT_ON_BATCH + " must be set to true when "
+                  + CONFIG_SYNCABLE_SYNC_ON_BATCH + " is set to true.");
     }
 
-    this.setName(target.toString());
+    // Create the configured failure failurePolicy
+    this.failurePolicy = FAILURE_POLICY_FACTORY.newPolicy(context);
 
     // other configuration
-    this.batchSize = context.getLong(
-        DatasetSinkConstants.CONFIG_KITE_BATCH_SIZE,
-        DatasetSinkConstants.DEFAULT_BATCH_SIZE);
-    this.rollIntervalS = context.getInteger(
-        DatasetSinkConstants.CONFIG_KITE_ROLL_INTERVAL,
-        DatasetSinkConstants.DEFAULT_ROLL_INTERVAL);
+    this.batchSize = context.getLong(CONFIG_KITE_BATCH_SIZE,
+        DEFAULT_BATCH_SIZE);
+    this.rollIntervalSeconds = context.getInteger(CONFIG_KITE_ROLL_INTERVAL,
+        DEFAULT_ROLL_INTERVAL);
 
     this.counter = new SinkCounter(datasetName);
   }
 
   @Override
   public synchronized void start() {
-    this.lastRolledMs = System.currentTimeMillis();
+    this.lastRolledMillis = System.currentTimeMillis();
     counter.start();
     // signal that this sink is ready to process
     LOG.info("Started DatasetSink " + getName());
@@ -196,183 +224,359 @@ public class DatasetSink extends AbstractSink implements Configurable {
    * Causes the sink to roll at the next {@link #process()} call.
    */
   @VisibleForTesting
-  public void roll() {
-    this.lastRolledMs = 0l;
+  void roll() {
+    this.lastRolledMillis = 0l;
+  }
+
+  @VisibleForTesting
+  DatasetWriter<GenericRecord> getWriter() {
+    return writer;
+  }
+
+  @VisibleForTesting
+  void setWriter(DatasetWriter<GenericRecord> writer) {
+    this.writer = writer;
+  }
+
+  @VisibleForTesting
+  void setParser(EntityParser<GenericRecord> parser) {
+    this.parser = parser;
+  }
+
+  @VisibleForTesting
+  void setFailurePolicy(FailurePolicy failurePolicy) {
+    this.failurePolicy = failurePolicy;
   }
 
   @Override
   public synchronized void stop() {
     counter.stop();
 
-    if (writer != null) {
-      // any write problems invalidate the writer, which is immediately closed
-      writer.close();
-      this.writer = null;
-      this.lastRolledMs = System.currentTimeMillis();
+    try {
+      // Close the writer and commit the transaction, but don't create a new
+      // writer since we're stopping
+      closeWriter();
+      commitTransaction();
+    } catch (EventDeliveryException ex) {
+      rollbackTransaction();
+
+      LOG.warn("Closing the writer failed: " + ex.getLocalizedMessage());
+      LOG.debug("Exception follows.", ex);
+      // We don't propogate the exception as the transaction would have been
+      // rolled back and we can still finish stopping
     }
 
-    // signal that this sink has stopped
+  // signal that this sink has stopped
     LOG.info("Stopped dataset sink: " + getName());
     super.stop();
   }
 
   @Override
   public Status process() throws EventDeliveryException {
-    if (writer == null) {
-      try {
-        this.writer = newWriter(login, target);
-      } catch (DatasetException e) {
-        // DatasetException includes DatasetNotFoundException
-        throw new EventDeliveryException(
-          "Cannot write to " + getName(), e);
+    long processedEvents = 0;
+
+    try {
+      if (shouldRoll()) {
+        closeWriter();
+        commitTransaction();
+        createWriter();
       }
-    }
 
-    // handle file rolling
-    if ((System.currentTimeMillis() - lastRolledMs) / 1000 > rollIntervalS) {
-      // close the current writer and get a new one
-      writer.close();
-      this.writer = newWriter(login, target);
-      this.lastRolledMs = System.currentTimeMillis();
-      LOG.info("Rolled writer for " + getName());
-    }
+      // The writer shouldn't be null at this point
+      Preconditions.checkNotNull(writer,
+          "Can't process events with a null writer. This is likely a bug.");
+      Channel channel = getChannel();
 
-    Channel channel = getChannel();
-    Transaction transaction = null;
-    try {
-      long processedEvents = 0;
+      // Enter the transaction boundary if we haven't already
+      enterTransaction(channel);
 
-      transaction = channel.getTransaction();
-      transaction.begin();
       for (; processedEvents < batchSize; processedEvents += 1) {
         Event event = channel.take();
+
         if (event == null) {
           // no events available in the channel
           break;
         }
 
-        this.datum = deserialize(event, reuseDatum ? datum : null);
-
-        // writeEncoded would be an optimization in some cases, but HBase
-        // will not support it and partitioned Datasets need to get partition
-        // info from the entity Object. We may be able to avoid the
-        // serialization round-trip otherwise.
-        writer.write(datum);
+        write(event);
       }
 
-      // TODO: Add option to sync, depends on CDK-203
-      writer.flush();
-
-      // commit after data has been written and flushed
-      transaction.commit();
-
-      if (processedEvents == 0) {
-        counter.incrementBatchEmptyCount();
-        return Status.BACKOFF;
-      } else if (processedEvents < batchSize) {
-        counter.incrementBatchUnderflowCount();
-      } else {
-        counter.incrementBatchCompleteCount();
+      // commit transaction
+      if (commitOnBatch) {
+        // Flush/sync before commiting. A failure here will result in rolling back
+        // the transaction
+        if (syncOnBatch) {
+          writer.sync();
+        } else {
+          writer.flush();
+        }
+        boolean committed = commitTransaction();
+        Preconditions.checkState(committed,
+            "Tried to commit a batch when there was no transaction");
+        committedBatch |= committed;
       }
-
-      counter.addToEventDrainSuccessCount(processedEvents);
-
-      return Status.READY;
-
     } catch (Throwable th) {
       // catch-all for any unhandled Throwable so that the transaction is
       // correctly rolled back.
-      if (transaction != null) {
+      rollbackTransaction();
+
+      if (commitOnBatch && committedBatch) {
         try {
-          transaction.rollback();
-        } catch (Exception ex) {
-          LOG.error("Transaction rollback failed", ex);
-          throw Throwables.propagate(ex);
+          closeWriter();
+        } catch (EventDeliveryException ex) {
+          LOG.warn("Error closing writer there may be temp files that need to"
+              + " be manually recovered: " + ex.getLocalizedMessage());
+          LOG.debug("Exception follows.", ex);
         }
+      } else {
+        this.writer = null;
       }
 
-      // close the writer and remove the its reference
-      writer.close();
-      this.writer = null;
-      this.lastRolledMs = System.currentTimeMillis();
-
       // handle the exception
       Throwables.propagateIfInstanceOf(th, Error.class);
       Throwables.propagateIfInstanceOf(th, EventDeliveryException.class);
       throw new EventDeliveryException(th);
-
-    } finally {
-      if (transaction != null) {
-        transaction.close();
-      }
     }
-  }
 
-  private DatasetWriter<GenericRecord> newWriter(
-    final UserGroupInformation login, final URI uri) {
-    View<GenericRecord> view = KerberosUtil.runPrivileged(login,
-      new PrivilegedExceptionAction<Dataset<GenericRecord>>() {
-        @Override
-        public Dataset<GenericRecord> run() {
-          return Datasets.load(uri);
-        }
-      });
-
-    DatasetDescriptor descriptor = view.getDataset().getDescriptor();
-    String formatName = descriptor.getFormat().getName();
-    Preconditions.checkArgument(allowedFormats().contains(formatName),
-      "Unsupported format: " + formatName);
-
-    Schema newSchema = descriptor.getSchema();
-    if (targetSchema == null || !newSchema.equals(targetSchema)) {
-      this.targetSchema = descriptor.getSchema();
-      // target dataset schema has changed, invalidate all readers based on it
-      readers.invalidateAll();
+    if (processedEvents == 0) {
+      counter.incrementBatchEmptyCount();
+      return Status.BACKOFF;
+    } else if (processedEvents < batchSize) {
+      counter.incrementBatchUnderflowCount();
+    } else {
+      counter.incrementBatchCompleteCount();
     }
 
-    this.reuseDatum = !("parquet".equals(formatName));
-    this.datasetName = view.getDataset().getName();
+    counter.addToEventDrainSuccessCount(processedEvents);
 
-    return view.newWriter();
+    return Status.READY;
   }
 
   /**
-   * Not thread-safe.
+   * Parse the event using the entity parser and write the entity to the dataset.
    *
-   * @param event
-   * @param reuse
-   * @return
+   * @param event The event to write
+   * @throws EventDeliveryException An error occurred trying to write to the
+                                dataset that couldn't or shouldn't be
+                                handled by the failure policy.
    */
-  private GenericRecord deserialize(Event event, GenericRecord reuse)
-      throws EventDeliveryException {
-    decoder = DecoderFactory.get().binaryDecoder(event.getBody(), decoder);
-    // no checked exception is thrown in the CacheLoader
-    DatumReader<GenericRecord> reader = readers.getUnchecked(schema(event));
+  @VisibleForTesting
+  void write(Event event) throws EventDeliveryException {
     try {
-      return reader.read(reuse, decoder);
-    } catch (IOException ex) {
-      throw new EventDeliveryException("Cannot deserialize event", ex);
+      this.entity = parser.parse(event, reuseEntity ? entity : null);
+      this.bytesParsed += event.getBody().length;
+
+      // writeEncoded would be an optimization in some cases, but HBase
+      // will not support it and partitioned Datasets need to get partition
+      // info from the entity Object. We may be able to avoid the
+      // serialization round-trip otherwise.
+      writer.write(entity);
+    } catch (NonRecoverableEventException ex) {
+      failurePolicy.handle(event, ex);
+    } catch (DataFileWriter.AppendWriteException ex) {
+      failurePolicy.handle(event, ex);
+    } catch (RuntimeException ex) {
+      Throwables.propagateIfInstanceOf(ex, EventDeliveryException.class);
+      throw new EventDeliveryException(ex);
     }
   }
 
-  private static Schema schema(Event event) throws EventDeliveryException {
-    Map<String, String> headers = event.getHeaders();
-    String schemaURL = headers.get(
-        DatasetSinkConstants.AVRO_SCHEMA_URL_HEADER);
+  /**
+   * Create a new writer.
+   *
+   * This method also re-loads the dataset so updates to the configuration or
+   * a dataset created after Flume starts will be loaded.
+   *
+   * @throws EventDeliveryException There was an error creating the writer.
+   */
+  @VisibleForTesting
+  void createWriter() throws EventDeliveryException {
+    // reset the commited flag whenver a new writer is created
+    committedBatch = false;
     try {
-      if (headers.get(DatasetSinkConstants.AVRO_SCHEMA_URL_HEADER) != null) {
-        return schemasFromURL.get(schemaURL);
-      } else {
-        return schemasFromLiteral.get(
-            headers.get(DatasetSinkConstants.AVRO_SCHEMA_LITERAL_HEADER));
+      View<GenericRecord> view = KerberosUtil.runPrivileged(login,
+          new PrivilegedExceptionAction<Dataset<GenericRecord>>() {
+            @Override
+            public Dataset<GenericRecord> run() {
+              return Datasets.load(datasetUri);
+            }
+          });
+
+      DatasetDescriptor descriptor = view.getDataset().getDescriptor();
+      Format format = descriptor.getFormat();
+      Preconditions.checkArgument(allowedFormats().contains(format.getName()),
+          "Unsupported format: " + format.getName());
+
+      Schema newSchema = descriptor.getSchema();
+      if (datasetSchema == null || !newSchema.equals(datasetSchema)) {
+        this.datasetSchema = descriptor.getSchema();
+        // dataset schema has changed, create a new parser
+        parser = ENTITY_PARSER_FACTORY.newParser(datasetSchema, context);
       }
-    } catch (ExecutionException ex) {
-      throw new EventDeliveryException("Cannot get schema", ex.getCause());
+
+      this.reuseEntity = !(Formats.PARQUET.equals(format));
+
+      // TODO: Check that the format implements Flushable after CDK-863
+      // goes in. For now, just check that the Dataset is Avro format
+      this.commitOnBatch = context.getBoolean(CONFIG_FLUSHABLE_COMMIT_ON_BATCH,
+          DEFAULT_FLUSHABLE_COMMIT_ON_BATCH) && (Formats.AVRO.equals(format));
+
+      // TODO: Check that the format implements Syncable after CDK-863
+      // goes in. For now, just check that the Dataset is Avro format
+      this.syncOnBatch = context.getBoolean(CONFIG_SYNCABLE_SYNC_ON_BATCH,
+          DEFAULT_SYNCABLE_SYNC_ON_BATCH) && (Formats.AVRO.equals(format));
+
+      this.datasetName = view.getDataset().getName();
+
+      this.writer = view.newWriter();
+
+      // Reset the last rolled time and the metrics
+      this.lastRolledMillis = System.currentTimeMillis();
+      this.bytesParsed = 0l;
+    } catch (DatasetNotFoundException ex) {
+      throw new EventDeliveryException("Dataset " + datasetUri + " not found."
+          + " The dataset must be created before Flume can write to it.", ex);
+    } catch (RuntimeException ex) {
+      throw new EventDeliveryException("Error trying to open a new"
+          + " writer for dataset " + datasetUri, ex);
     }
   }
 
+  /**
+   * Return true if the sink should roll the writer.
+   *
+   * Currently, this is based on time since the last roll or if the current
+   * writer is null.
+   *
+   * @return True if and only if the sink should roll the writer
+   */
+  private boolean shouldRoll() {
+    long currentTimeMillis = System.currentTimeMillis();
+    long elapsedTimeSeconds = TimeUnit.MILLISECONDS.toSeconds(
+        currentTimeMillis - lastRolledMillis);
+
+    LOG.debug("Current time: {}, lastRolled: {}, diff: {} sec",
+        new Object[] {currentTimeMillis, lastRolledMillis, elapsedTimeSeconds});
+
+    return elapsedTimeSeconds >= rollIntervalSeconds || writer == null;
+  }
+
+  /**
+   * Close the current writer.
+   *
+   * This method always sets the current writer to null even if close fails.
+   * If this method throws an Exception, callers *must* rollback any active
+   * transaction to ensure that data is replayed.
+   *
+   * @throws EventDeliveryException
+   */
+  @VisibleForTesting
+  void closeWriter() throws EventDeliveryException {
+    if (writer != null) {
+      try {
+        writer.close();
+
+        long elapsedTimeSeconds = TimeUnit.MILLISECONDS.toSeconds(
+            System.currentTimeMillis() - lastRolledMillis);
+        LOG.info("Closed writer for {} after {} seconds and {} bytes parsed",
+            new Object[]{datasetUri, elapsedTimeSeconds, bytesParsed});
+      } catch (DatasetIOException ex) {
+        throw new EventDeliveryException("Check HDFS permissions/health. IO"
+            + " error trying to close the  writer for dataset " + datasetUri,
+            ex);
+      } catch (DatasetWriterException ex) {
+        throw new EventDeliveryException("Failure moving temp file.", ex);
+      } catch (RuntimeException ex) {
+        throw new EventDeliveryException("Error trying to close the  writer for"
+            + " dataset " + datasetUri, ex);
+      } finally {
+        // If we failed to close the writer then we give up on it as we'll
+        // end up throwing an EventDeliveryException which will result in
+        // a transaction rollback and a replay of any events written during
+        // the current transaction. If commitOnBatch is true, you can still
+        // end up with orphaned temp files that have data to be recovered.
+        this.writer = null;
+        failurePolicy.close();
+      }
+    }
+  }
+
+  /**
+   * Enter the transaction boundary. This will either begin a new transaction
+   * if one didn't already exist. If we're already in a transaction boundary,
+   * then this method does nothing.
+   *
+   * @param channel The Sink's channel
+   * @throws EventDeliveryException There was an error starting a new batch
+   *                                with the failure policy.
+   */
+  private void enterTransaction(Channel channel) throws EventDeliveryException {
+    // There's no synchronization around the transaction instance because the
+    // Sink API states "the Sink#process() call is guaranteed to only
+    // be accessed  by a single thread". Technically other methods could be
+    // called concurrently, but the implementation of SinkRunner waits
+    // for the Thread running process() to end before calling stop()
+    if (transaction == null) {
+      this.transaction = channel.getTransaction();
+      transaction.begin();
+      failurePolicy = FAILURE_POLICY_FACTORY.newPolicy(context);
+    }
+  }
+
+  /**
+   * Commit and close the transaction.
+   *
+   * If this method throws an Exception the caller *must* ensure that the
+   * transaction is rolled back. Callers can roll back the transaction by
+   * calling {@link #rollbackTransaction()}.
+   *
+   * @return True if there was an open transaction and it was committed, false
+   *         otherwise.
+   * @throws EventDeliveryException There was an error ending the batch with
+   *                                the failure policy.
+   */
+  @VisibleForTesting
+  boolean commitTransaction() throws EventDeliveryException {
+    if (transaction != null) {
+      failurePolicy.sync();
+      transaction.commit();
+      transaction.close();
+      this.transaction = null;
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  /**
+   * Rollback the transaction. If there is a RuntimeException during rollback,
+   * it will be logged but the transaction instance variable will still be
+   * nullified.
+   */
+  private void rollbackTransaction() {
+    if (transaction != null) {
+      try {
+        // If the transaction wasn't committed before we got the exception, we
+        // need to rollback.
+          transaction.rollback();
+      } catch (RuntimeException ex) {
+        LOG.error("Transaction rollback failed: " + ex.getLocalizedMessage());
+        LOG.debug("Exception follows.", ex);
+      } finally {
+        transaction.close();
+        this.transaction = null;
+      }
+    }
+}
+
+  /**
+   * Get the name of the dataset from the URI
+   *
+   * @param uri The dataset or view URI
+   * @return The dataset name
+   */
   private static String uriToName(URI uri) {
     return Registration.lookupDatasetUri(URI.create(
-      uri.getRawSchemeSpecificPart())).second().get("dataset");
+        uri.getRawSchemeSpecificPart())).second().get("dataset");
   }
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/1d49ef70/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSinkConstants.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSinkConstants.java b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSinkConstants.java
index 3c67738..af33304 100644
--- a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSinkConstants.java
+++ b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSinkConstants.java
@@ -18,9 +18,11 @@
 
 package org.apache.flume.sink.kite;
 
+import org.kitesdk.data.URIBuilder;
+
 public class DatasetSinkConstants {
   /**
-   * URI of the Kite DatasetRepository.
+   * URI of the Kite Dataset
    */
   public static final String CONFIG_KITE_DATASET_URI = "kite.dataset.uri";
 
@@ -35,6 +37,13 @@ public class DatasetSinkConstants {
   public static final String CONFIG_KITE_DATASET_NAME = "kite.dataset.name";
 
   /**
+   * Namespace of the Kite Dataset to write into.
+   */
+  public static final String CONFIG_KITE_DATASET_NAMESPACE =
+      "kite.dataset.namespace";
+  public static final String DEFAULT_NAMESPACE = URIBuilder.NAMESPACE_DEFAULT;
+
+  /**
    * Number of records to process from the incoming channel per call to process.
    */
   public static final String CONFIG_KITE_BATCH_SIZE = "kite.batchSize";
@@ -47,7 +56,68 @@ public class DatasetSinkConstants {
   public static int DEFAULT_ROLL_INTERVAL = 30; // seconds
 
   /**
-   * Headers with avro schema information is expected.
+   * Flag for committing the Flume transaction on each batch for Flushable
+   * datasets. When set to false, Flume will only commit the transaction when
+   * roll interval has expired. Setting this to false requires enough space
+   * in the channel to handle all events delivered during the roll interval.
+   * Defaults to true.
+   */
+  public static final String CONFIG_FLUSHABLE_COMMIT_ON_BATCH =
+      "kite.flushable.commiteOnBatch";
+  public static boolean DEFAULT_FLUSHABLE_COMMIT_ON_BATCH = true;
+
+  /**
+   * Flag for syncing the DatasetWriter on each batch for Syncable
+   * datasets. Defaults to true.
+   */
+  public static final String CONFIG_SYNCABLE_SYNC_ON_BATCH =
+      "kite.syncable.syncOnBatch";
+  public static boolean DEFAULT_SYNCABLE_SYNC_ON_BATCH = true;
+
+  /**
+   * Parser used to parse Flume Events into Kite entities.
+   */
+  public static final String CONFIG_ENTITY_PARSER = "kite.entityParser";
+
+  /**
+   * Built-in entity parsers
+   */
+  public static final String AVRO_ENTITY_PARSER = "avro";
+  public static final String DEFAULT_ENTITY_PARSER = AVRO_ENTITY_PARSER;
+  public static final String[] AVAILABLE_PARSERS = new String[] {
+    AVRO_ENTITY_PARSER
+  };
+
+  /**
+   * Policy used to handle non-recoverable failures.
+   */
+  public static final String CONFIG_FAILURE_POLICY = "kite.failurePolicy";
+
+  /**
+   * Write non-recoverable Flume events to a Kite dataset.
+   */
+  public static final String SAVE_FAILURE_POLICY = "save";
+
+  /**
+   * The URI to write non-recoverable Flume events to in the case of an error.
+   * If the dataset doesn't exist, it will be created.
+   */
+  public static final String CONFIG_KITE_ERROR_DATASET_URI =
+      "kite.error.dataset.uri";
+
+  /**
+   * Retry non-recoverable Flume events. This will lead to a never ending cycle
+   * of failure, but matches the previous default semantics of the DatasetSink.
+   */
+  public static final String RETRY_FAILURE_POLICY = "retry";
+  public static final String DEFAULT_FAILURE_POLICY = RETRY_FAILURE_POLICY;
+  public static final String[] AVAILABLE_POLICIES = new String[] {
+    RETRY_FAILURE_POLICY,
+    SAVE_FAILURE_POLICY
+  };
+
+  /**
+   * Headers where avro schema information is expected.
    */
   public static final String AVRO_SCHEMA_LITERAL_HEADER =
       "flume.avro.schema.literal";

http://git-wip-us.apache.org/repos/asf/flume/blob/1d49ef70/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/NonRecoverableEventException.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/NonRecoverableEventException.java b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/NonRecoverableEventException.java
new file mode 100644
index 0000000..8f6c0ae
--- /dev/null
+++ b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/NonRecoverableEventException.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flume.sink.kite;
+
+
+/**
+ * A non-recoverable error trying to deliver the event.
+ * 
+ * Non-recoverable event delivery failures include:
+ * 
+ * 1. Error parsing the event body thrown from the {@link EntityParser}
+ * 2. A schema mismatch between the schema of an event and the schema of the
+ *    destination dataset.
+ * 3. A missing schema from the Event header when using the
+ *    {@link AvroEntityParser}.
+ */
+public class NonRecoverableEventException extends Exception {
+
+  private static final long serialVersionUID = 3485151222482254285L;
+
+  public NonRecoverableEventException() {
+    super();
+  }
+
+  public NonRecoverableEventException(String message) {
+    super(message);
+  }
+
+  public NonRecoverableEventException(String message, Throwable t) {
+    super(message, t);
+  }
+
+  public NonRecoverableEventException(Throwable t) {
+    super(t);
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/1d49ef70/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/parser/AvroParser.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/parser/AvroParser.java b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/parser/AvroParser.java
new file mode 100644
index 0000000..7c6a723
--- /dev/null
+++ b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/parser/AvroParser.java
@@ -0,0 +1,208 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flume.sink.kite.parser;
+
+import com.google.common.base.Preconditions;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.util.concurrent.UncheckedExecutionException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.net.URL;
+import java.util.Locale;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.sink.kite.NonRecoverableEventException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import static org.apache.flume.sink.kite.DatasetSinkConstants.*;
+
+/**
+ * An {@link EntityParser} that parses Avro serialized bytes from an event.
+ * 
+ * The Avro schema used to serialize the data should be set as either a URL
+ * or literal in the flume.avro.schema.url or flume.avro.schema.literal event
+ * headers respectively.
+ */
+public class AvroParser implements EntityParser<GenericRecord> {
+
+  static Configuration conf = new Configuration();
+
+  /**
+   * A cache of literal schemas to avoid re-parsing the schema.
+   */
+  private static final LoadingCache<String, Schema> schemasFromLiteral =
+      CacheBuilder.newBuilder()
+      .build(new CacheLoader<String, Schema>() {
+        @Override
+        public Schema load(String literal) {
+          Preconditions.checkNotNull(literal,
+              "Schema literal cannot be null without a Schema URL");
+          return new Schema.Parser().parse(literal);
+        }
+      });
+
+  /**
+   * A cache of schemas retrieved by URL to avoid re-parsing the schema.
+   */
+  private static final LoadingCache<String, Schema> schemasFromURL =
+      CacheBuilder.newBuilder()
+      .build(new CacheLoader<String, Schema>() {
+        @Override
+        public Schema load(String url) throws IOException {
+          Schema.Parser parser = new Schema.Parser();
+          InputStream is = null;
+          try {
+            FileSystem fs = FileSystem.get(URI.create(url), conf);
+            if (url.toLowerCase(Locale.ENGLISH).startsWith("hdfs:/")) {
+              is = fs.open(new Path(url));
+            } else {
+              is = new URL(url).openStream();
+            }
+            return parser.parse(is);
+          } finally {
+            if (is != null) {
+              is.close();
+            }
+          }
+        }
+      });
+
+  /**
+   * The schema of the destination dataset.
+   * 
+   * Used as the reader schema during parsing.
+   */
+  private final Schema datasetSchema;
+
+  /**
+   * A cache of DatumReaders per schema.
+   */
+  private final LoadingCache<Schema, DatumReader<GenericRecord>> readers =
+      CacheBuilder.newBuilder()
+          .build(new CacheLoader<Schema, DatumReader<GenericRecord>>() {
+            @Override
+            public DatumReader<GenericRecord> load(Schema schema) {
+              // must use the target dataset's schema for reading to ensure the
+              // records are able to be stored using it
+              return new GenericDatumReader<GenericRecord>(
+                  schema, datasetSchema);
+            }
+          });
+
+  /**
+   * The binary decoder to reuse for event parsing.
+   */
+  private BinaryDecoder decoder = null;
+
+  /**
+   * Create a new AvroParser given the schema of the destination dataset.
+   * 
+   * @param datasetSchema The schema of the destination dataset.
+   */
+  private AvroParser(Schema datasetSchema) {
+    this.datasetSchema = datasetSchema;
+  }
+
+  /**
+   * Parse the entity from the body of the given event.
+   * 
+   * @param event The event to parse.
+   * @param reuse If non-null, this may be reused and returned from this method.
+   * @return The parsed entity as a GenericRecord.
+   * @throws EventDeliveryException A recoverable error such as an error
+   *                                downloading the schema from the URL has
+   *                                occurred.
+   * @throws NonRecoverableEventException A non-recoverable error such as an
+   *                                      unparsable schema or entity has
+   *                                      occurred.
+   */
+  @Override
+  public GenericRecord parse(Event event, GenericRecord reuse)
+      throws EventDeliveryException, NonRecoverableEventException {
+    decoder = DecoderFactory.get().binaryDecoder(event.getBody(), decoder);
+
+    try {
+      DatumReader<GenericRecord> reader = readers.getUnchecked(schema(event));
+      return reader.read(reuse, decoder);
+    } catch (IOException ex) {
+      throw new NonRecoverableEventException("Cannot deserialize event", ex);
+    } catch (RuntimeException ex) {
+      throw new NonRecoverableEventException("Cannot deserialize event", ex);
+    }
+  }
+
+  /**
+   * Get the schema from the event headers.
+   * 
+   * @param event The Flume event
+   * @return The schema for the event
+   * @throws EventDeliveryException A recoverable error such as an error
+   *                                downloading the schema from the URL has
+   *                                occurred.
+   * @throws NonRecoverableEventException A non-recoverable error such as an
+   *                                      unparsable schema has occurred.
+   */
+  private static Schema schema(Event event) throws EventDeliveryException,
+      NonRecoverableEventException {
+    Map<String, String> headers = event.getHeaders();
+    String schemaURL = headers.get(AVRO_SCHEMA_URL_HEADER);
+    try {
+      if (schemaURL != null) {
+        return schemasFromURL.get(schemaURL);
+      } else {
+        String schemaLiteral = headers.get(AVRO_SCHEMA_LITERAL_HEADER);
+        if (schemaLiteral == null) {
+          throw new NonRecoverableEventException("No schema in event headers."
+              + " Headers must include either " + AVRO_SCHEMA_URL_HEADER
+              + " or " + AVRO_SCHEMA_LITERAL_HEADER);
+        }
+
+        return schemasFromLiteral.get(schemaLiteral);
+      }
+    } catch (ExecutionException ex) {
+      throw new EventDeliveryException("Cannot get schema", ex.getCause());
+    } catch (UncheckedExecutionException ex) {
+      throw new NonRecoverableEventException("Cannot parse schema",
+          ex.getCause());
+    }
+  }
+
+  public static class Builder implements EntityParser.Builder<GenericRecord> {
+
+    @Override
+    public EntityParser<GenericRecord> build(Schema datasetSchema, Context config) {
+      return new AvroParser(datasetSchema);
+    }
+    
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/1d49ef70/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/parser/EntityParser.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/parser/EntityParser.java b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/parser/EntityParser.java
new file mode 100644
index 0000000..f2051a2
--- /dev/null
+++ b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/parser/EntityParser.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flume.sink.kite.parser;
+
+import javax.annotation.concurrent.NotThreadSafe;
+import org.apache.avro.Schema;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.sink.kite.NonRecoverableEventException;
+
+@NotThreadSafe
+public interface EntityParser<E> {
+
+  /**
+   * Parse a Kite entity from a Flume event
+   *
+   * @param event The event to parse
+   * @param reuse If non-null, this may be reused and returned
+   * @return The parsed entity
+   * @throws EventDeliveryException A recoverable error during parsing. Parsing
+   *                                can be safely retried.
+   * @throws NonRecoverableEventException A non-recoverable error during
+   *                                      parsing. The event must be discarded.
+   *                                    
+   */
+  public E parse(Event event, E reuse) throws EventDeliveryException,
+      NonRecoverableEventException;
+
+  /**
+   * Knows how to build {@code EntityParser}s. Implementers must provide a
+   * no-arg constructor.
+   * 
+   * @param <E> The type of entities generated
+   */
+  public static interface Builder<E> {
+
+    public EntityParser<E> build(Schema datasetSchema, Context config);
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/1d49ef70/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/parser/EntityParserFactory.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/parser/EntityParserFactory.java b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/parser/EntityParserFactory.java
new file mode 100644
index 0000000..cfb7349
--- /dev/null
+++ b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/parser/EntityParserFactory.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flume.sink.kite.parser;
+
+import java.util.Arrays;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.flume.Context;
+
+import static org.apache.flume.sink.kite.DatasetSinkConstants.*;
+
+
+public class EntityParserFactory {
+
+  public EntityParser<GenericRecord> newParser(Schema datasetSchema, Context config) {
+    EntityParser<GenericRecord> parser;
+
+    String parserType = config.getString(CONFIG_ENTITY_PARSER,
+        DEFAULT_ENTITY_PARSER);
+
+    if (parserType.equals(AVRO_ENTITY_PARSER)) {
+      parser = new AvroParser.Builder().build(datasetSchema, config);
+    } else {
+
+      Class<? extends EntityParser.Builder> builderClass;
+      Class c;
+      try {
+        c = Class.forName(parserType);
+      } catch (ClassNotFoundException ex) {
+        throw new IllegalArgumentException("EntityParser.Builder class "
+            + parserType + " not found. Must set " + CONFIG_ENTITY_PARSER
+            + " to a class that implements EntityParser.Builder or to a builtin"
+            + " parser: " + Arrays.toString(AVAILABLE_PARSERS), ex);
+      }
+
+      if (c != null && EntityParser.Builder.class.isAssignableFrom(c)) {
+        builderClass = c;
+      } else {
+        throw new IllegalArgumentException("Class " + parserType + " does not"
+            + " implement EntityParser.Builder. Must set "
+            + CONFIG_ENTITY_PARSER + " to a class that extends"
+            + " EntityParser.Builder or to a builtin parser: "
+            + Arrays.toString(AVAILABLE_PARSERS));
+      }
+
+      EntityParser.Builder<GenericRecord> builder;
+      try {
+        builder = builderClass.newInstance();
+      } catch (InstantiationException ex) {
+        throw new IllegalArgumentException("Can't instantiate class "
+            + parserType + ". Must set " + CONFIG_ENTITY_PARSER + " to a class"
+            + " that extends EntityParser.Builder or to a builtin parser: "
+            + Arrays.toString(AVAILABLE_PARSERS), ex);
+      } catch (IllegalAccessException ex) {
+        throw new IllegalArgumentException("Can't instantiate class "
+            + parserType + ". Must set " + CONFIG_ENTITY_PARSER + " to a class"
+            + " that extends EntityParser.Builder or to a builtin parser: "
+            + Arrays.toString(AVAILABLE_PARSERS), ex);
+      }
+
+      parser = builder.build(datasetSchema, config);
+    }
+
+    return parser;
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/1d49ef70/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/policy/FailurePolicy.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/policy/FailurePolicy.java b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/policy/FailurePolicy.java
new file mode 100644
index 0000000..47b6a25
--- /dev/null
+++ b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/policy/FailurePolicy.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flume.sink.kite.policy;
+
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.sink.kite.DatasetSink;
+import org.kitesdk.data.DatasetWriter;
+
+/**
+ * A policy for dealing with non-recoverable event delivery failures.
+ *
+ * Non-recoverable event delivery failures include:
+ *
+ * 1. Error parsing the event body thrown from the {@link EntityParser}
+ * 2. A schema mismatch between the schema of an event and the schema of the
+ *    destination dataset.
+ * 3. A missing schema from the Event header when using the
+ *    {@link AvroEntityParser}.
+ *
+ * The life cycle of a FailurePolicy mimics the life cycle of the
+ * {@link DatasetSink#writer}:
+ *
+ * 1. When a new writer is created, the policy will be instantiated.
+ * 2. As Event failures happen,
+ *    {@link #handle(org.apache.flume.Event, java.lang.Throwable)} will be
+ *    called to let the policy handle the failure.
+ * 3. If the {@link DatasetSink} is configured to commit on batch, then the
+ *    {@link #sync()} method will be called when the batch is committed.
+ * 4. When the writer is closed, the policy's {@link #close()} method will be
+ *    called.
+ */
+public interface FailurePolicy {
+
+  /**
+   * Handle a non-recoverable event.
+   *
+   * @param event The event
+   * @param cause The cause of the failure
+   * @throws EventDeliveryException The policy failed to handle the event. When
+   *                                this is thrown, the Flume transaction will
+   *                                be rolled back and the event will be retried
+   *                                along with the rest of the batch.
+   */
+  public void handle(Event event, Throwable cause)
+      throws EventDeliveryException;
+
+  /**
+   * Ensure any handled events are on stable storage.
+   *
+   * This allows the policy implementation to sync any data that it may not
+   * have fully handled.
+   *
+   * See {@link DatasetWriter#sync()}.
+   *
+   * @throws EventDeliveryException The policy failed while syncing data.
+   *                                When this is thrown, the Flume transaction
+   *                                will be rolled back and the batch will be
+   *                                retried.
+   */
+  public void sync() throws EventDeliveryException;
+
+  /**
+   * Close this FailurePolicy and release any resources.
+   *
+   * @throws EventDeliveryException The policy failed while closing resources.
+   *                                When this is thrown, the Flume transaction
+   *                                will be rolled back and the batch will be
+   *                                retried.
+   */
+  public void close() throws EventDeliveryException;
+
+  /**
+   * Knows how to build {@code FailurePolicy}s. Implementers must provide a
+   * no-arg constructor.
+   */
+  public static interface Builder {
+
+    /**
+     * Build a new {@code FailurePolicy}
+     *
+     * @param config The Flume configuration context
+     * @return The {@code FailurePolicy}
+     */
+    FailurePolicy build(Context config);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/1d49ef70/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/policy/FailurePolicyFactory.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/policy/FailurePolicyFactory.java b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/policy/FailurePolicyFactory.java
new file mode 100644
index 0000000..a8b2008
--- /dev/null
+++ b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/policy/FailurePolicyFactory.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flume.sink.kite.policy;
+
+import java.util.Arrays;
+import org.apache.flume.Context;
+
+import static org.apache.flume.sink.kite.DatasetSinkConstants.*;
+
+
+public class FailurePolicyFactory {
+
+  public FailurePolicy newPolicy(Context config) {
+    FailurePolicy policy;
+
+    String policyType = config.getString(CONFIG_FAILURE_POLICY,
+        DEFAULT_FAILURE_POLICY);
+
+    if (policyType.equals(RETRY_FAILURE_POLICY)) {
+      policy = new RetryPolicy.Builder().build(config);
+    } else if (policyType.equals(SAVE_FAILURE_POLICY)) {
+      policy = new SavePolicy.Builder().build(config);
+    } else {
+
+      Class<? extends FailurePolicy.Builder> builderClass;
+      Class c;
+      try {
+        c = Class.forName(policyType);
+      } catch (ClassNotFoundException ex) {
+        throw new IllegalArgumentException("FailurePolicy.Builder class "
+            + policyType + " not found. Must set " + CONFIG_FAILURE_POLICY
+            + " to a class that implements FailurePolicy.Builder or to a builtin"
+            + " policy: " + Arrays.toString(AVAILABLE_POLICIES), ex);
+      }
+
+      if (c != null && FailurePolicy.Builder.class.isAssignableFrom(c)) {
+        builderClass = c;
+      } else {
+        throw new IllegalArgumentException("Class " + policyType + " does not"
+            + " implement FailurePolicy.Builder. Must set "
+            + CONFIG_FAILURE_POLICY + " to a class that extends"
+            + " FailurePolicy.Builder or to a builtin policy: "
+            + Arrays.toString(AVAILABLE_POLICIES));
+      }
+
+      FailurePolicy.Builder builder;
+      try {
+        builder = builderClass.newInstance();
+      } catch (InstantiationException ex) {
+        throw new IllegalArgumentException("Can't instantiate class "
+            + policyType + ". Must set " + CONFIG_FAILURE_POLICY + " to a class"
+            + " that extends FailurePolicy.Builder or to a builtin policy: "
+            + Arrays.toString(AVAILABLE_POLICIES), ex);
+      } catch (IllegalAccessException ex) {
+        throw new IllegalArgumentException("Can't instantiate class "
+            + policyType + ". Must set " + CONFIG_FAILURE_POLICY + " to a class"
+            + " that extends FailurePolicy.Builder or to a builtin policy: "
+            + Arrays.toString(AVAILABLE_POLICIES), ex);
+      }
+
+      policy = builder.build(config);
+    }
+   
+    return policy;
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/1d49ef70/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/policy/RetryPolicy.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/policy/RetryPolicy.java b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/policy/RetryPolicy.java
new file mode 100644
index 0000000..9a4991c
--- /dev/null
+++ b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/policy/RetryPolicy.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flume.sink.kite.policy;
+
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A failure policy that logs the error and then forces a retry by throwing
+ * {@link EventDeliveryException}.
+ */
+public class RetryPolicy implements FailurePolicy {
+  private static final Logger LOG = LoggerFactory.getLogger(RetryPolicy.class);
+
+  private RetryPolicy() {
+  }
+
+  @Override
+  public void handle(Event event, Throwable cause) throws EventDeliveryException {
+    LOG.error("Event delivery failed: " + cause.getLocalizedMessage());
+    LOG.debug("Exception follows.", cause);
+
+    throw new EventDeliveryException(cause);
+  }
+
+  @Override
+  public void sync() throws EventDeliveryException {
+    // do nothing
+  }
+
+  @Override
+  public void close() throws EventDeliveryException {
+    // do nothing
+  }
+
+  public static class Builder implements FailurePolicy.Builder {
+
+    @Override
+    public FailurePolicy build(Context config) {
+      return new RetryPolicy();
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/1d49ef70/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/policy/SavePolicy.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/policy/SavePolicy.java b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/policy/SavePolicy.java
new file mode 100644
index 0000000..ed47898
--- /dev/null
+++ b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/policy/SavePolicy.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flume.sink.kite.policy;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.source.avro.AvroFlumeEvent;
+import org.kitesdk.data.DatasetDescriptor;
+import org.kitesdk.data.DatasetWriter;
+import org.kitesdk.data.Datasets;
+import org.kitesdk.data.Formats;
+import org.kitesdk.data.View;
+
+import static org.apache.flume.sink.kite.DatasetSinkConstants.*;
+
+/**
+ * A failure policy that writes the raw Flume event to a Kite dataset.
+ */
+public class SavePolicy implements FailurePolicy {
+
+  private final View<AvroFlumeEvent> dataset;
+  private DatasetWriter<AvroFlumeEvent> writer;
+  private int nEventsHandled;
+
+  private SavePolicy(Context context) {
+    String uri = context.getString(CONFIG_KITE_ERROR_DATASET_URI);
+    Preconditions.checkArgument(uri != null, "Must set "
+        + CONFIG_KITE_ERROR_DATASET_URI + " when " + CONFIG_FAILURE_POLICY
+        + "=save");
+    if (Datasets.exists(uri)) {
+      dataset = Datasets.load(uri, AvroFlumeEvent.class);
+    } else {
+      DatasetDescriptor descriptor = new DatasetDescriptor.Builder()
+          .schema(AvroFlumeEvent.class)
+          .build();
+      dataset = Datasets.create(uri, descriptor, AvroFlumeEvent.class);
+    }
+
+    nEventsHandled = 0;
+  }
+
+  @Override
+  public void handle(Event event, Throwable cause) throws EventDeliveryException {
+    try {
+      if (writer == null) {
+        writer = dataset.newWriter();
+      }
+
+      final AvroFlumeEvent avroEvent = new AvroFlumeEvent();
+      avroEvent.setBody(ByteBuffer.wrap(event.getBody()));
+      avroEvent.setHeaders(toCharSeqMap(event.getHeaders()));
+
+      writer.write(avroEvent);
+      nEventsHandled++;
+    } catch (RuntimeException ex) {
+      throw new EventDeliveryException(ex);
+    }
+  }
+
+  @Override
+  public void sync() throws EventDeliveryException {
+    if (nEventsHandled > 0) {
+      if (Formats.PARQUET.equals(
+          dataset.getDataset().getDescriptor().getFormat())) {
+        // We need to close the writer on sync if we're writing to a Parquet
+        // dataset
+        close();
+      } else {
+        writer.sync();
+      }
+    }
+  }
+
+  @Override
+  public void close() throws EventDeliveryException {
+    if (nEventsHandled > 0) {
+      try {
+        writer.close();
+      } catch (RuntimeException ex) {
+        throw new EventDeliveryException(ex);
+      } finally {
+        writer = null;
+        nEventsHandled = 0;
+      }
+    }
+  }
+
+  /**
+   * Helper function to convert a map of String to a map of CharSequence.
+   */
+  private static Map<CharSequence, CharSequence> toCharSeqMap(
+      Map<String, String> map) {
+    return Maps.<CharSequence, CharSequence>newHashMap(map);
+  }
+
+  public static class Builder implements FailurePolicy.Builder {
+
+    @Override
+    public FailurePolicy build(Context config) {
+      return new SavePolicy(config);
+    }
+
+  }
+}