You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by hs...@apache.org on 2014/09/14 00:00:36 UTC
git commit: FLUME-2462. Remove use of deprecated methods in
DatasetSink
Repository: flume
Updated Branches:
refs/heads/trunk 72be82d30 -> acc965134
FLUME-2462. Remove use of deprecated methods in DatasetSink
(Ryan Blue via Hari)
Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/acc96513
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/acc96513
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/acc96513
Branch: refs/heads/trunk
Commit: acc9651346fe5834cdf5cdf0eb417f624aab1d09
Parents: 72be82d
Author: Hari Shreedharan <hs...@apache.org>
Authored: Sat Sep 13 14:59:37 2014 -0700
Committer: Hari Shreedharan <hs...@apache.org>
Committed: Sat Sep 13 14:59:37 2014 -0700
----------------------------------------------------------------------
.../org/apache/flume/sink/kite/DatasetSink.java | 5 +-
.../apache/flume/sink/kite/TestDatasetSink.java | 87 ++++++++++----------
2 files changed, 45 insertions(+), 47 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flume/blob/acc96513/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSink.java b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSink.java
index 8f3ae51..4cd3027 100644
--- a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSink.java
+++ b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSink.java
@@ -52,9 +52,9 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
import org.kitesdk.data.Dataset;
-import org.kitesdk.data.DatasetRepositories;
import org.kitesdk.data.DatasetWriter;
import org.kitesdk.data.Datasets;
+import org.kitesdk.data.spi.URIBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -172,7 +172,8 @@ public class DatasetSink extends AbstractSink implements Configurable {
new PrivilegedExceptionAction<Dataset<GenericRecord>>() {
@Override
public Dataset<GenericRecord> run() {
- return DatasetRepositories.open(repositoryURI).load(datasetName);
+ return Datasets.load(
+ new URIBuilder(repositoryURI, datasetName).build());
}
});
}
http://git-wip-us.apache.org/repos/asf/flume/blob/acc96513/flume-ng-sinks/flume-dataset-sink/src/test/java/org/apache/flume/sink/kite/TestDatasetSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-dataset-sink/src/test/java/org/apache/flume/sink/kite/TestDatasetSink.java b/flume-ng-sinks/flume-dataset-sink/src/test/java/org/apache/flume/sink/kite/TestDatasetSink.java
index b448b50..a277381 100644
--- a/flume-ng-sinks/flume-dataset-sink/src/test/java/org/apache/flume/sink/kite/TestDatasetSink.java
+++ b/flume-ng-sinks/flume-dataset-sink/src/test/java/org/apache/flume/sink/kite/TestDatasetSink.java
@@ -28,6 +28,7 @@ import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
+import java.net.URI;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
@@ -62,10 +63,9 @@ import org.junit.Test;
import org.kitesdk.data.Dataset;
import org.kitesdk.data.DatasetDescriptor;
import org.kitesdk.data.DatasetReader;
-import org.kitesdk.data.DatasetRepositories;
-import org.kitesdk.data.DatasetRepository;
import org.kitesdk.data.Datasets;
import org.kitesdk.data.PartitionStrategy;
+import org.kitesdk.data.View;
public class TestDatasetSink {
@@ -73,8 +73,6 @@ public class TestDatasetSink {
public static final String DATASET_NAME = "test";
public static final String FILE_DATASET_URI =
"dataset:file:target/test-repo/" + DATASET_NAME;
- public static final DatasetRepository REPO = DatasetRepositories
- .open(FILE_REPO_URI);
public static final File SCHEMA_FILE = new File("target/record-schema.avsc");
public static final Schema RECORD_SCHEMA = new Schema.Parser().parse(
"{\"type\":\"record\",\"name\":\"rec\",\"fields\":[" +
@@ -94,7 +92,7 @@ public class TestDatasetSink {
Context config = null;
Channel in = null;
- List<GenericData.Record> expected = null;
+ List<GenericRecord> expected = null;
private static final String DFS_DIR = "target/test/dfs";
private static final String TEST_BUILD_DATA_KEY = "test.build.data";
private static String oldTestBuildDataProp = null;
@@ -118,8 +116,8 @@ public class TestDatasetSink {
@Before
public void setup() throws EventDeliveryException {
- REPO.delete(DATASET_NAME);
- REPO.create(DATASET_NAME, DESCRIPTOR);
+ Datasets.delete(FILE_DATASET_URI);
+ Datasets.create(FILE_DATASET_URI, DESCRIPTOR);
this.config = new Context();
this.in = new MemoryChannel();
@@ -128,17 +126,17 @@ public class TestDatasetSink {
config.put(DatasetSinkConstants.CONFIG_KITE_DATASET_URI, FILE_DATASET_URI);
GenericRecordBuilder builder = new GenericRecordBuilder(RECORD_SCHEMA);
- expected = Lists.newArrayList(
+ expected = Lists.<GenericRecord>newArrayList(
builder.set("id", "1").set("msg", "msg1").build(),
builder.set("id", "2").set("msg", "msg2").build(),
builder.set("id", "3").set("msg", "msg3").build());
putToChannel(in, Iterables.transform(expected,
- new Function<GenericData.Record, Event>() {
+ new Function<GenericRecord, Event>() {
private int i = 0;
@Override
- public Event apply(@Nullable GenericData.Record rec) {
+ public Event apply(@Nullable GenericRecord rec) {
this.i += 1;
boolean useURI = (i % 2) == 0;
return event(rec, RECORD_SCHEMA, SCHEMA_FILE, useURI);
@@ -148,7 +146,7 @@ public class TestDatasetSink {
@After
public void teardown() {
- REPO.delete(DATASET_NAME);
+ Datasets.delete(FILE_DATASET_URI);
}
@Test
@@ -166,7 +164,7 @@ public class TestDatasetSink {
Assert.assertEquals(
Sets.newHashSet(expected),
- read(REPO.<GenericData.Record>load(DATASET_NAME)));
+ read(Datasets.load(FILE_DATASET_URI)));
Assert.assertEquals("Should have committed", 0, remaining(in));
}
@@ -185,7 +183,7 @@ public class TestDatasetSink {
Assert.assertEquals(
Sets.newHashSet(expected),
- read(REPO.<GenericData.Record>load(DATASET_NAME)));
+ read(Datasets.load(FILE_DATASET_URI)));
Assert.assertEquals("Should have committed", 0, remaining(in));
}
@@ -200,18 +198,17 @@ public class TestDatasetSink {
Assert.assertEquals(
Sets.newHashSet(expected),
- read(REPO.<GenericData.Record>load(DATASET_NAME)));
+ read(Datasets.load(FILE_DATASET_URI)));
Assert.assertEquals("Should have committed", 0, remaining(in));
}
@Test
public void testParquetDataset() throws EventDeliveryException {
Datasets.delete(FILE_DATASET_URI);
- Dataset<GenericData.Record> created = Datasets.create(FILE_DATASET_URI,
+ Dataset<GenericRecord> created = Datasets.create(FILE_DATASET_URI,
new DatasetDescriptor.Builder(DESCRIPTOR)
.format("parquet")
- .build(),
- GenericData.Record.class);
+ .build());
DatasetSink sink = sink(in, config);
@@ -226,15 +223,16 @@ public class TestDatasetSink {
@Test
public void testPartitionedData() throws EventDeliveryException {
- REPO.create("partitioned", new DatasetDescriptor.Builder(DESCRIPTOR)
- .partitionStrategy(new PartitionStrategy.Builder()
- .identity("id", 10) // partition by id
- .build())
- .build());
-
+ URI partitionedUri = URI.create("dataset:file:target/test-repo/partitioned");
try {
+ Datasets.create(partitionedUri, new DatasetDescriptor.Builder(DESCRIPTOR)
+ .partitionStrategy(new PartitionStrategy.Builder()
+ .identity("id", 10) // partition by id
+ .build())
+ .build());
+
config.put(DatasetSinkConstants.CONFIG_KITE_DATASET_URI,
- "dataset:file:target/test-repo/partitioned");
+ partitionedUri.toString());
DatasetSink sink = sink(in, config);
// run the sink
@@ -244,11 +242,11 @@ public class TestDatasetSink {
Assert.assertEquals(
Sets.newHashSet(expected),
- read(REPO.<GenericData.Record>load("partitioned")));
+ read(Datasets.load(partitionedUri)));
Assert.assertEquals("Should have committed", 0, remaining(in));
} finally {
- if (REPO.exists("partitioned")) {
- REPO.delete("partitioned");
+ if (Datasets.exists(partitionedUri)) {
+ Datasets.delete(partitionedUri);
}
}
}
@@ -260,19 +258,18 @@ public class TestDatasetSink {
MiniDFSCluster cluster = new MiniDFSCluster
.Builder(new Configuration())
.build();
- DatasetRepository hdfsRepo = null;
- try {
- FileSystem dfs = cluster.getFileSystem();
- Configuration conf = dfs.getConf();
- String repoURI = "repo:" + conf.get("fs.defaultFS") + "/tmp/repo";
+ FileSystem dfs = cluster.getFileSystem();
+ Configuration conf = dfs.getConf();
+
+ URI hdfsUri = URI.create(
+ "dataset:" + conf.get("fs.defaultFS") + "/tmp/repo" + DATASET_NAME);
+ try {
// create a repository and dataset in HDFS
- hdfsRepo = DatasetRepositories.open(repoURI);
- hdfsRepo.create(DATASET_NAME, DESCRIPTOR);
+ Datasets.create(hdfsUri, DESCRIPTOR);
// update the config to use the HDFS repository
- config.put(DatasetSinkConstants.CONFIG_KITE_DATASET_URI,
- "dataset:" + conf.get("fs.defaultFS") + "/tmp/repo/" + DATASET_NAME);
+ config.put(DatasetSinkConstants.CONFIG_KITE_DATASET_URI, hdfsUri.toString());
DatasetSink sink = sink(in, config);
@@ -283,12 +280,12 @@ public class TestDatasetSink {
Assert.assertEquals(
Sets.newHashSet(expected),
- read(hdfsRepo.<GenericData.Record>load(DATASET_NAME)));
+ read(Datasets.load(hdfsUri)));
Assert.assertEquals("Should have committed", 0, remaining(in));
} finally {
- if (hdfsRepo != null && hdfsRepo.exists(DATASET_NAME)) {
- hdfsRepo.delete(DATASET_NAME);
+ if (Datasets.exists(hdfsUri)) {
+ Datasets.delete(hdfsUri);
}
cluster.shutdown();
}
@@ -308,13 +305,13 @@ public class TestDatasetSink {
sink.process(); // roll and process the third
Assert.assertEquals(
Sets.newHashSet(expected.subList(0, 2)),
- read(REPO.<GenericData.Record>load(DATASET_NAME)));
+ read(Datasets.load(FILE_DATASET_URI)));
Assert.assertEquals("Should have committed", 0, remaining(in));
sink.roll(); // roll at the next process call
sink.process(); // roll, the channel is empty
Assert.assertEquals(
Sets.newHashSet(expected),
- read(REPO.<GenericData.Record>load(DATASET_NAME)));
+ read(Datasets.load(FILE_DATASET_URI)));
sink.stop();
}
@@ -326,7 +323,7 @@ public class TestDatasetSink {
DatasetSink sink = sink(in, config);
- Dataset<GenericData.Record> records = REPO.load(DATASET_NAME);
+ Dataset<GenericRecord> records = Datasets.load(FILE_DATASET_URI);
// run the sink
sink.start();
@@ -369,7 +366,7 @@ public class TestDatasetSink {
Assert.assertEquals(
Sets.newHashSet(expected),
- read(REPO.<GenericData.Record>load(DATASET_NAME)));
+ read(Datasets.load(FILE_DATASET_URI)));
Assert.assertEquals("Should have committed", 0, remaining(in));
}
@@ -430,10 +427,10 @@ public class TestDatasetSink {
return sink;
}
- public static <T> HashSet<T> read(Dataset<T> dataset) {
+ public static <T> HashSet<T> read(View<T> view) {
DatasetReader<T> reader = null;
try {
- reader = dataset.newReader();
+ reader = view.newReader();
return Sets.newHashSet(reader.iterator());
} finally {
if (reader != null) {