You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by ja...@apache.org on 2015/02/25 18:52:26 UTC

flume git commit: FLUME-2633: Update Kite dependency to 1.0.0

Repository: flume
Updated Branches:
  refs/heads/trunk 4a91456a7 -> 407874b90


FLUME-2633: Update Kite dependency to 1.0.0

(Tom White via Jarek Jarcec Cecho)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/407874b9
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/407874b9
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/407874b9

Branch: refs/heads/trunk
Commit: 407874b9041b2073e3ccc8e0a18353f865d900ce
Parents: 4a91456
Author: Jarek Jarcec Cecho <ja...@apache.org>
Authored: Wed Feb 25 09:51:59 2015 -0800
Committer: Jarek Jarcec Cecho <ja...@apache.org>
Committed: Wed Feb 25 09:51:59 2015 -0800

----------------------------------------------------------------------
 flume-ng-sinks/flume-dataset-sink/pom.xml              |  2 +-
 .../java/org/apache/flume/sink/kite/DatasetSink.java   | 13 ++++++-------
 .../apache/flume/sink/kite/policy/FailurePolicy.java   |  4 ++--
 .../org/apache/flume/sink/kite/policy/SavePolicy.java  |  5 ++++-
 .../org/apache/flume/sink/kite/TestDatasetSink.java    |  8 ++++----
 pom.xml                                                |  4 ++--
 6 files changed, 19 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/407874b9/flume-ng-sinks/flume-dataset-sink/pom.xml
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-dataset-sink/pom.xml b/flume-ng-sinks/flume-dataset-sink/pom.xml
index e929d60..ad3f603 100644
--- a/flume-ng-sinks/flume-dataset-sink/pom.xml
+++ b/flume-ng-sinks/flume-dataset-sink/pom.xml
@@ -93,7 +93,7 @@ limitations under the License.
 
     <dependency>
       <groupId>org.kitesdk</groupId>
-      <artifactId>kite-data-hcatalog</artifactId>
+      <artifactId>kite-data-hive</artifactId>
     </dependency>
 
     <dependency>

http://git-wip-us.apache.org/repos/asf/flume/blob/407874b9/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSink.java b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSink.java
index 3e66532..fd9f991 100644
--- a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSink.java
+++ b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSink.java
@@ -46,8 +46,9 @@ import org.kitesdk.data.DatasetDescriptor;
 import org.kitesdk.data.DatasetIOException;
 import org.kitesdk.data.DatasetNotFoundException;
 import org.kitesdk.data.DatasetWriter;
-import org.kitesdk.data.DatasetWriterException;
 import org.kitesdk.data.Datasets;
+import org.kitesdk.data.Flushable;
+import org.kitesdk.data.Syncable;
 import org.kitesdk.data.View;
 import org.kitesdk.data.spi.Registration;
 import org.kitesdk.data.URIBuilder;
@@ -305,10 +306,10 @@ public class DatasetSink extends AbstractSink implements Configurable {
       if (commitOnBatch) {
         // Flush/sync before commiting. A failure here will result in rolling back
         // the transaction
-        if (syncOnBatch) {
-          writer.sync();
-        } else {
-          writer.flush();
+        if (syncOnBatch && writer instanceof Syncable) {
+          ((Syncable) writer).sync();
+        } else if (writer instanceof Flushable) {
+          ((Flushable) writer).flush();
         }
         boolean committed = commitTransaction();
         Preconditions.checkState(committed,
@@ -484,8 +485,6 @@ public class DatasetSink extends AbstractSink implements Configurable {
         throw new EventDeliveryException("Check HDFS permissions/health. IO"
             + " error trying to close the  writer for dataset " + datasetUri,
             ex);
-      } catch (DatasetWriterException ex) {
-        throw new EventDeliveryException("Failure moving temp file.", ex);
       } catch (RuntimeException ex) {
         throw new EventDeliveryException("Error trying to close the  writer for"
             + " dataset " + datasetUri, ex);

http://git-wip-us.apache.org/repos/asf/flume/blob/407874b9/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/policy/FailurePolicy.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/policy/FailurePolicy.java b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/policy/FailurePolicy.java
index 47b6a25..f6f875a 100644
--- a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/policy/FailurePolicy.java
+++ b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/policy/FailurePolicy.java
@@ -22,7 +22,7 @@ import org.apache.flume.Context;
 import org.apache.flume.Event;
 import org.apache.flume.EventDeliveryException;
 import org.apache.flume.sink.kite.DatasetSink;
-import org.kitesdk.data.DatasetWriter;
+import org.kitesdk.data.Syncable;
 
 /**
  * A policy for dealing with non-recoverable event delivery failures.
@@ -68,7 +68,7 @@ public interface FailurePolicy {
    * This allows the policy implementation to sync any data that it may not
    * have fully handled.
    *
-   * See {@link DatasetWriter#sync()}.
+   * See {@link Syncable#sync()}.
    *
    * @throws EventDeliveryException The policy failed while syncing data.
    *                                When this is thrown, the Flume transaction

http://git-wip-us.apache.org/repos/asf/flume/blob/407874b9/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/policy/SavePolicy.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/policy/SavePolicy.java b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/policy/SavePolicy.java
index ed47898..bd537ec 100644
--- a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/policy/SavePolicy.java
+++ b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/policy/SavePolicy.java
@@ -30,6 +30,7 @@ import org.kitesdk.data.DatasetDescriptor;
 import org.kitesdk.data.DatasetWriter;
 import org.kitesdk.data.Datasets;
 import org.kitesdk.data.Formats;
+import org.kitesdk.data.Syncable;
 import org.kitesdk.data.View;
 
 import static org.apache.flume.sink.kite.DatasetSinkConstants.*;
@@ -87,7 +88,9 @@ public class SavePolicy implements FailurePolicy {
         // dataset
         close();
       } else {
-        writer.sync();
+        if (writer instanceof Syncable) {
+          ((Syncable) writer).sync();
+        }
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/flume/blob/407874b9/flume-ng-sinks/flume-dataset-sink/src/test/java/org/apache/flume/sink/kite/TestDatasetSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-dataset-sink/src/test/java/org/apache/flume/sink/kite/TestDatasetSink.java b/flume-ng-sinks/flume-dataset-sink/src/test/java/org/apache/flume/sink/kite/TestDatasetSink.java
index 58aa467..621920d 100644
--- a/flume-ng-sinks/flume-dataset-sink/src/test/java/org/apache/flume/sink/kite/TestDatasetSink.java
+++ b/flume-ng-sinks/flume-dataset-sink/src/test/java/org/apache/flume/sink/kite/TestDatasetSink.java
@@ -78,12 +78,12 @@ import static org.mockito.Mockito.*;
 
 public class TestDatasetSink {
 
-  public static final String FILE_REPO_URI = "repo:file:target/test-repo";
+  public static final String FILE_REPO_URI = "repo:file:target/test_repo";
   public static final String DATASET_NAME = "test";
   public static final String FILE_DATASET_URI =
-      "dataset:file:target/test-repo/" + DATASET_NAME;
+      "dataset:file:target/test_repo/" + DATASET_NAME;
   public static final String ERROR_DATASET_URI =
-      "dataset:file:target/test-repo/failed-events";
+      "dataset:file:target/test_repo/failed_events";
   public static final File SCHEMA_FILE = new File("target/record-schema.avsc");
   public static final Schema RECORD_SCHEMA = new Schema.Parser().parse(
       "{\"type\":\"record\",\"name\":\"rec\",\"fields\":[" +
@@ -254,7 +254,7 @@ public class TestDatasetSink {
 
   @Test
   public void testPartitionedData() throws EventDeliveryException {
-    URI partitionedUri = URI.create("dataset:file:target/test-repo/partitioned");
+    URI partitionedUri = URI.create("dataset:file:target/test_repo/partitioned");
     try {
       Datasets.create(partitionedUri, new DatasetDescriptor.Builder(DESCRIPTOR)
           .partitionStrategy(new PartitionStrategy.Builder()

http://git-wip-us.apache.org/repos/asf/flume/blob/407874b9/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index ea7ffe3..3e40558 100644
--- a/pom.xml
+++ b/pom.xml
@@ -50,7 +50,7 @@ limitations under the License.
     <elasticsearch.version>0.90.1</elasticsearch.version>
     <hadoop2.version>2.4.0</hadoop2.version>
     <thrift.version>0.7.0</thrift.version>
-    <kite.version>0.17.1</kite.version>
+    <kite.version>1.0.0</kite.version>
     <hive.version>0.13.1</hive.version>
     <xalan.verion>2.7.1</xalan.verion>
     <xerces.version>2.9.1</xerces.version>
@@ -1328,7 +1328,7 @@ limitations under the License.
       </dependency>
       <dependency>
         <groupId>org.kitesdk</groupId>
-        <artifactId>kite-data-hcatalog</artifactId>
+        <artifactId>kite-data-hive</artifactId>
         <version>${kite.version}</version>
       </dependency>
       <dependency>