You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by ja...@apache.org on 2015/02/25 18:52:26 UTC
flume git commit: FLUME-2633: Update Kite dependency to 1.0.0
Repository: flume
Updated Branches:
refs/heads/trunk 4a91456a7 -> 407874b90
FLUME-2633: Update Kite dependency to 1.0.0
(Tom White via Jarek Jarcec Cecho)
Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/407874b9
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/407874b9
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/407874b9
Branch: refs/heads/trunk
Commit: 407874b9041b2073e3ccc8e0a18353f865d900ce
Parents: 4a91456
Author: Jarek Jarcec Cecho <ja...@apache.org>
Authored: Wed Feb 25 09:51:59 2015 -0800
Committer: Jarek Jarcec Cecho <ja...@apache.org>
Committed: Wed Feb 25 09:51:59 2015 -0800
----------------------------------------------------------------------
flume-ng-sinks/flume-dataset-sink/pom.xml | 2 +-
.../java/org/apache/flume/sink/kite/DatasetSink.java | 13 ++++++-------
.../apache/flume/sink/kite/policy/FailurePolicy.java | 4 ++--
.../org/apache/flume/sink/kite/policy/SavePolicy.java | 5 ++++-
.../org/apache/flume/sink/kite/TestDatasetSink.java | 8 ++++----
pom.xml | 4 ++--
6 files changed, 19 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flume/blob/407874b9/flume-ng-sinks/flume-dataset-sink/pom.xml
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-dataset-sink/pom.xml b/flume-ng-sinks/flume-dataset-sink/pom.xml
index e929d60..ad3f603 100644
--- a/flume-ng-sinks/flume-dataset-sink/pom.xml
+++ b/flume-ng-sinks/flume-dataset-sink/pom.xml
@@ -93,7 +93,7 @@ limitations under the License.
<dependency>
<groupId>org.kitesdk</groupId>
- <artifactId>kite-data-hcatalog</artifactId>
+ <artifactId>kite-data-hive</artifactId>
</dependency>
<dependency>
http://git-wip-us.apache.org/repos/asf/flume/blob/407874b9/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSink.java b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSink.java
index 3e66532..fd9f991 100644
--- a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSink.java
+++ b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSink.java
@@ -46,8 +46,9 @@ import org.kitesdk.data.DatasetDescriptor;
import org.kitesdk.data.DatasetIOException;
import org.kitesdk.data.DatasetNotFoundException;
import org.kitesdk.data.DatasetWriter;
-import org.kitesdk.data.DatasetWriterException;
import org.kitesdk.data.Datasets;
+import org.kitesdk.data.Flushable;
+import org.kitesdk.data.Syncable;
import org.kitesdk.data.View;
import org.kitesdk.data.spi.Registration;
import org.kitesdk.data.URIBuilder;
@@ -305,10 +306,10 @@ public class DatasetSink extends AbstractSink implements Configurable {
if (commitOnBatch) {
// Flush/sync before commiting. A failure here will result in rolling back
// the transaction
- if (syncOnBatch) {
- writer.sync();
- } else {
- writer.flush();
+ if (syncOnBatch && writer instanceof Syncable) {
+ ((Syncable) writer).sync();
+ } else if (writer instanceof Flushable) {
+ ((Flushable) writer).flush();
}
boolean committed = commitTransaction();
Preconditions.checkState(committed,
@@ -484,8 +485,6 @@ public class DatasetSink extends AbstractSink implements Configurable {
throw new EventDeliveryException("Check HDFS permissions/health. IO"
+ " error trying to close the writer for dataset " + datasetUri,
ex);
- } catch (DatasetWriterException ex) {
- throw new EventDeliveryException("Failure moving temp file.", ex);
} catch (RuntimeException ex) {
throw new EventDeliveryException("Error trying to close the writer for"
+ " dataset " + datasetUri, ex);
http://git-wip-us.apache.org/repos/asf/flume/blob/407874b9/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/policy/FailurePolicy.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/policy/FailurePolicy.java b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/policy/FailurePolicy.java
index 47b6a25..f6f875a 100644
--- a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/policy/FailurePolicy.java
+++ b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/policy/FailurePolicy.java
@@ -22,7 +22,7 @@ import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.sink.kite.DatasetSink;
-import org.kitesdk.data.DatasetWriter;
+import org.kitesdk.data.Syncable;
/**
* A policy for dealing with non-recoverable event delivery failures.
@@ -68,7 +68,7 @@ public interface FailurePolicy {
* This allows the policy implementation to sync any data that it may not
* have fully handled.
*
- * See {@link DatasetWriter#sync()}.
+ * See {@link Syncable#sync()}.
*
* @throws EventDeliveryException The policy failed while syncing data.
* When this is thrown, the Flume transaction
http://git-wip-us.apache.org/repos/asf/flume/blob/407874b9/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/policy/SavePolicy.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/policy/SavePolicy.java b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/policy/SavePolicy.java
index ed47898..bd537ec 100644
--- a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/policy/SavePolicy.java
+++ b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/policy/SavePolicy.java
@@ -30,6 +30,7 @@ import org.kitesdk.data.DatasetDescriptor;
import org.kitesdk.data.DatasetWriter;
import org.kitesdk.data.Datasets;
import org.kitesdk.data.Formats;
+import org.kitesdk.data.Syncable;
import org.kitesdk.data.View;
import static org.apache.flume.sink.kite.DatasetSinkConstants.*;
@@ -87,7 +88,9 @@ public class SavePolicy implements FailurePolicy {
// dataset
close();
} else {
- writer.sync();
+ if (writer instanceof Syncable) {
+ ((Syncable) writer).sync();
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/flume/blob/407874b9/flume-ng-sinks/flume-dataset-sink/src/test/java/org/apache/flume/sink/kite/TestDatasetSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-dataset-sink/src/test/java/org/apache/flume/sink/kite/TestDatasetSink.java b/flume-ng-sinks/flume-dataset-sink/src/test/java/org/apache/flume/sink/kite/TestDatasetSink.java
index 58aa467..621920d 100644
--- a/flume-ng-sinks/flume-dataset-sink/src/test/java/org/apache/flume/sink/kite/TestDatasetSink.java
+++ b/flume-ng-sinks/flume-dataset-sink/src/test/java/org/apache/flume/sink/kite/TestDatasetSink.java
@@ -78,12 +78,12 @@ import static org.mockito.Mockito.*;
public class TestDatasetSink {
- public static final String FILE_REPO_URI = "repo:file:target/test-repo";
+ public static final String FILE_REPO_URI = "repo:file:target/test_repo";
public static final String DATASET_NAME = "test";
public static final String FILE_DATASET_URI =
- "dataset:file:target/test-repo/" + DATASET_NAME;
+ "dataset:file:target/test_repo/" + DATASET_NAME;
public static final String ERROR_DATASET_URI =
- "dataset:file:target/test-repo/failed-events";
+ "dataset:file:target/test_repo/failed_events";
public static final File SCHEMA_FILE = new File("target/record-schema.avsc");
public static final Schema RECORD_SCHEMA = new Schema.Parser().parse(
"{\"type\":\"record\",\"name\":\"rec\",\"fields\":[" +
@@ -254,7 +254,7 @@ public class TestDatasetSink {
@Test
public void testPartitionedData() throws EventDeliveryException {
- URI partitionedUri = URI.create("dataset:file:target/test-repo/partitioned");
+ URI partitionedUri = URI.create("dataset:file:target/test_repo/partitioned");
try {
Datasets.create(partitionedUri, new DatasetDescriptor.Builder(DESCRIPTOR)
.partitionStrategy(new PartitionStrategy.Builder()
http://git-wip-us.apache.org/repos/asf/flume/blob/407874b9/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index ea7ffe3..3e40558 100644
--- a/pom.xml
+++ b/pom.xml
@@ -50,7 +50,7 @@ limitations under the License.
<elasticsearch.version>0.90.1</elasticsearch.version>
<hadoop2.version>2.4.0</hadoop2.version>
<thrift.version>0.7.0</thrift.version>
- <kite.version>0.17.1</kite.version>
+ <kite.version>1.0.0</kite.version>
<hive.version>0.13.1</hive.version>
<xalan.verion>2.7.1</xalan.verion>
<xerces.version>2.9.1</xerces.version>
@@ -1328,7 +1328,7 @@ limitations under the License.
</dependency>
<dependency>
<groupId>org.kitesdk</groupId>
- <artifactId>kite-data-hcatalog</artifactId>
+ <artifactId>kite-data-hive</artifactId>
<version>${kite.version}</version>
</dependency>
<dependency>