You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by hs...@apache.org on 2014/09/14 00:00:36 UTC

git commit: FLUME-2462. Remove use of deprecated methods in DatasetSink

Repository: flume
Updated Branches:
  refs/heads/trunk 72be82d30 -> acc965134


FLUME-2462. Remove use of deprecated methods in DatasetSink

(Ryan Blue via Hari)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/acc96513
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/acc96513
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/acc96513

Branch: refs/heads/trunk
Commit: acc9651346fe5834cdf5cdf0eb417f624aab1d09
Parents: 72be82d
Author: Hari Shreedharan <hs...@apache.org>
Authored: Sat Sep 13 14:59:37 2014 -0700
Committer: Hari Shreedharan <hs...@apache.org>
Committed: Sat Sep 13 14:59:37 2014 -0700

----------------------------------------------------------------------
 .../org/apache/flume/sink/kite/DatasetSink.java |  5 +-
 .../apache/flume/sink/kite/TestDatasetSink.java | 87 ++++++++++----------
 2 files changed, 45 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/acc96513/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSink.java b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSink.java
index 8f3ae51..4cd3027 100644
--- a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSink.java
+++ b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSink.java
@@ -52,9 +52,9 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.kitesdk.data.Dataset;
-import org.kitesdk.data.DatasetRepositories;
 import org.kitesdk.data.DatasetWriter;
 import org.kitesdk.data.Datasets;
+import org.kitesdk.data.spi.URIBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -172,7 +172,8 @@ public class DatasetSink extends AbstractSink implements Configurable {
           new PrivilegedExceptionAction<Dataset<GenericRecord>>() {
             @Override
             public Dataset<GenericRecord> run() {
-              return DatasetRepositories.open(repositoryURI).load(datasetName);
+              return Datasets.load(
+                  new URIBuilder(repositoryURI, datasetName).build());
             }
           });
     }

http://git-wip-us.apache.org/repos/asf/flume/blob/acc96513/flume-ng-sinks/flume-dataset-sink/src/test/java/org/apache/flume/sink/kite/TestDatasetSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-dataset-sink/src/test/java/org/apache/flume/sink/kite/TestDatasetSink.java b/flume-ng-sinks/flume-dataset-sink/src/test/java/org/apache/flume/sink/kite/TestDatasetSink.java
index b448b50..a277381 100644
--- a/flume-ng-sinks/flume-dataset-sink/src/test/java/org/apache/flume/sink/kite/TestDatasetSink.java
+++ b/flume-ng-sinks/flume-dataset-sink/src/test/java/org/apache/flume/sink/kite/TestDatasetSink.java
@@ -28,6 +28,7 @@ import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.FileWriter;
 import java.io.IOException;
+import java.net.URI;
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
@@ -62,10 +63,9 @@ import org.junit.Test;
 import org.kitesdk.data.Dataset;
 import org.kitesdk.data.DatasetDescriptor;
 import org.kitesdk.data.DatasetReader;
-import org.kitesdk.data.DatasetRepositories;
-import org.kitesdk.data.DatasetRepository;
 import org.kitesdk.data.Datasets;
 import org.kitesdk.data.PartitionStrategy;
+import org.kitesdk.data.View;
 
 public class TestDatasetSink {
 
@@ -73,8 +73,6 @@ public class TestDatasetSink {
   public static final String DATASET_NAME = "test";
   public static final String FILE_DATASET_URI =
       "dataset:file:target/test-repo/" + DATASET_NAME;
-  public static final DatasetRepository REPO = DatasetRepositories
-      .open(FILE_REPO_URI);
   public static final File SCHEMA_FILE = new File("target/record-schema.avsc");
   public static final Schema RECORD_SCHEMA = new Schema.Parser().parse(
       "{\"type\":\"record\",\"name\":\"rec\",\"fields\":[" +
@@ -94,7 +92,7 @@ public class TestDatasetSink {
 
   Context config = null;
   Channel in = null;
-  List<GenericData.Record> expected = null;
+  List<GenericRecord> expected = null;
   private static final String DFS_DIR = "target/test/dfs";
   private static final String TEST_BUILD_DATA_KEY = "test.build.data";
   private static String oldTestBuildDataProp = null;
@@ -118,8 +116,8 @@ public class TestDatasetSink {
 
   @Before
   public void setup() throws EventDeliveryException {
-    REPO.delete(DATASET_NAME);
-    REPO.create(DATASET_NAME, DESCRIPTOR);
+    Datasets.delete(FILE_DATASET_URI);
+    Datasets.create(FILE_DATASET_URI, DESCRIPTOR);
 
     this.config = new Context();
     this.in = new MemoryChannel();
@@ -128,17 +126,17 @@ public class TestDatasetSink {
     config.put(DatasetSinkConstants.CONFIG_KITE_DATASET_URI, FILE_DATASET_URI);
 
     GenericRecordBuilder builder = new GenericRecordBuilder(RECORD_SCHEMA);
-    expected = Lists.newArrayList(
+    expected = Lists.<GenericRecord>newArrayList(
         builder.set("id", "1").set("msg", "msg1").build(),
         builder.set("id", "2").set("msg", "msg2").build(),
         builder.set("id", "3").set("msg", "msg3").build());
 
     putToChannel(in, Iterables.transform(expected,
-        new Function<GenericData.Record, Event>() {
+        new Function<GenericRecord, Event>() {
           private int i = 0;
 
           @Override
-          public Event apply(@Nullable GenericData.Record rec) {
+          public Event apply(@Nullable GenericRecord rec) {
             this.i += 1;
             boolean useURI = (i % 2) == 0;
             return event(rec, RECORD_SCHEMA, SCHEMA_FILE, useURI);
@@ -148,7 +146,7 @@ public class TestDatasetSink {
 
   @After
   public void teardown() {
-    REPO.delete(DATASET_NAME);
+    Datasets.delete(FILE_DATASET_URI);
   }
 
   @Test
@@ -166,7 +164,7 @@ public class TestDatasetSink {
 
     Assert.assertEquals(
         Sets.newHashSet(expected),
-        read(REPO.<GenericData.Record>load(DATASET_NAME)));
+        read(Datasets.load(FILE_DATASET_URI)));
     Assert.assertEquals("Should have committed", 0, remaining(in));
   }
 
@@ -185,7 +183,7 @@ public class TestDatasetSink {
 
     Assert.assertEquals(
         Sets.newHashSet(expected),
-        read(REPO.<GenericData.Record>load(DATASET_NAME)));
+        read(Datasets.load(FILE_DATASET_URI)));
     Assert.assertEquals("Should have committed", 0, remaining(in));
   }
 
@@ -200,18 +198,17 @@ public class TestDatasetSink {
 
     Assert.assertEquals(
         Sets.newHashSet(expected),
-        read(REPO.<GenericData.Record>load(DATASET_NAME)));
+        read(Datasets.load(FILE_DATASET_URI)));
     Assert.assertEquals("Should have committed", 0, remaining(in));
   }
 
   @Test
   public void testParquetDataset() throws EventDeliveryException {
     Datasets.delete(FILE_DATASET_URI);
-    Dataset<GenericData.Record> created = Datasets.create(FILE_DATASET_URI,
+    Dataset<GenericRecord> created = Datasets.create(FILE_DATASET_URI,
         new DatasetDescriptor.Builder(DESCRIPTOR)
             .format("parquet")
-            .build(),
-        GenericData.Record.class);
+            .build());
 
     DatasetSink sink = sink(in, config);
 
@@ -226,15 +223,16 @@ public class TestDatasetSink {
 
   @Test
   public void testPartitionedData() throws EventDeliveryException {
-    REPO.create("partitioned", new DatasetDescriptor.Builder(DESCRIPTOR)
-        .partitionStrategy(new PartitionStrategy.Builder()
-            .identity("id", 10) // partition by id
-            .build())
-        .build());
-
+    URI partitionedUri = URI.create("dataset:file:target/test-repo/partitioned");
     try {
+      Datasets.create(partitionedUri, new DatasetDescriptor.Builder(DESCRIPTOR)
+          .partitionStrategy(new PartitionStrategy.Builder()
+              .identity("id", 10) // partition by id
+              .build())
+          .build());
+
       config.put(DatasetSinkConstants.CONFIG_KITE_DATASET_URI,
-          "dataset:file:target/test-repo/partitioned");
+          partitionedUri.toString());
       DatasetSink sink = sink(in, config);
 
       // run the sink
@@ -244,11 +242,11 @@ public class TestDatasetSink {
 
       Assert.assertEquals(
           Sets.newHashSet(expected),
-          read(REPO.<GenericData.Record>load("partitioned")));
+          read(Datasets.load(partitionedUri)));
       Assert.assertEquals("Should have committed", 0, remaining(in));
     } finally {
-      if (REPO.exists("partitioned")) {
-        REPO.delete("partitioned");
+      if (Datasets.exists(partitionedUri)) {
+        Datasets.delete(partitionedUri);
       }
     }
   }
@@ -260,19 +258,18 @@ public class TestDatasetSink {
     MiniDFSCluster cluster = new MiniDFSCluster
         .Builder(new Configuration())
         .build();
-    DatasetRepository hdfsRepo = null;
-    try {
-      FileSystem dfs = cluster.getFileSystem();
-      Configuration conf = dfs.getConf();
-      String repoURI = "repo:" + conf.get("fs.defaultFS") + "/tmp/repo";
 
+    FileSystem dfs = cluster.getFileSystem();
+    Configuration conf = dfs.getConf();
+
+    URI hdfsUri = URI.create(
+        "dataset:" + conf.get("fs.defaultFS") + "/tmp/repo" + DATASET_NAME);
+    try {
       // create a repository and dataset in HDFS
-      hdfsRepo = DatasetRepositories.open(repoURI);
-      hdfsRepo.create(DATASET_NAME, DESCRIPTOR);
+      Datasets.create(hdfsUri, DESCRIPTOR);
 
       // update the config to use the HDFS repository
-      config.put(DatasetSinkConstants.CONFIG_KITE_DATASET_URI,
-          "dataset:" + conf.get("fs.defaultFS") + "/tmp/repo/" + DATASET_NAME);
+      config.put(DatasetSinkConstants.CONFIG_KITE_DATASET_URI, hdfsUri.toString());
 
       DatasetSink sink = sink(in, config);
 
@@ -283,12 +280,12 @@ public class TestDatasetSink {
 
       Assert.assertEquals(
           Sets.newHashSet(expected),
-          read(hdfsRepo.<GenericData.Record>load(DATASET_NAME)));
+          read(Datasets.load(hdfsUri)));
       Assert.assertEquals("Should have committed", 0, remaining(in));
 
     } finally {
-      if (hdfsRepo != null && hdfsRepo.exists(DATASET_NAME)) {
-        hdfsRepo.delete(DATASET_NAME);
+      if (Datasets.exists(hdfsUri)) {
+        Datasets.delete(hdfsUri);
       }
       cluster.shutdown();
     }
@@ -308,13 +305,13 @@ public class TestDatasetSink {
     sink.process(); // roll and process the third
     Assert.assertEquals(
         Sets.newHashSet(expected.subList(0, 2)),
-        read(REPO.<GenericData.Record>load(DATASET_NAME)));
+        read(Datasets.load(FILE_DATASET_URI)));
     Assert.assertEquals("Should have committed", 0, remaining(in));
     sink.roll(); // roll at the next process call
     sink.process(); // roll, the channel is empty
     Assert.assertEquals(
         Sets.newHashSet(expected),
-        read(REPO.<GenericData.Record>load(DATASET_NAME)));
+        read(Datasets.load(FILE_DATASET_URI)));
     sink.stop();
   }
 
@@ -326,7 +323,7 @@ public class TestDatasetSink {
 
     DatasetSink sink = sink(in, config);
 
-    Dataset<GenericData.Record> records = REPO.load(DATASET_NAME);
+    Dataset<GenericRecord> records = Datasets.load(FILE_DATASET_URI);
 
     // run the sink
     sink.start();
@@ -369,7 +366,7 @@ public class TestDatasetSink {
 
     Assert.assertEquals(
         Sets.newHashSet(expected),
-        read(REPO.<GenericData.Record>load(DATASET_NAME)));
+        read(Datasets.load(FILE_DATASET_URI)));
     Assert.assertEquals("Should have committed", 0, remaining(in));
   }
 
@@ -430,10 +427,10 @@ public class TestDatasetSink {
     return sink;
   }
 
-  public static <T> HashSet<T> read(Dataset<T> dataset) {
+  public static <T> HashSet<T> read(View<T> view) {
     DatasetReader<T> reader = null;
     try {
-      reader = dataset.newReader();
+      reader = view.newReader();
       return Sets.newHashSet(reader.iterator());
     } finally {
       if (reader != null) {