You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2017/05/02 19:10:07 UTC
samza git commit: fix HdfsSystemAdmin when staging directory is empty
Repository: samza
Updated Branches:
refs/heads/master b71b253d2 -> 6726e1d10
fix HdfsSystemAdmin when staging directory is empty
getSystemStreamMetadata has the potential side effect to persist metadata to a staging directory on hdfs. This could fail if staging directory is empty. This patch addresses the issue with test to cover the scenario.
Author: Hai Lu <ha...@linkedin.com>
Reviewers: Xinyu Liu <xi...@apache.org>
Closes #151 from lhaiesp/master
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/6726e1d1
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/6726e1d1
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/6726e1d1
Branch: refs/heads/master
Commit: 6726e1d10964603826a23ad88896a27ae35ec150
Parents: b71b253
Author: Hai Lu <ha...@linkedin.com>
Authored: Tue May 2 12:09:55 2017 -0700
Committer: Xinyu Liu <xi...@xiliu-ld.linkedin.biz>
Committed: Tue May 2 12:09:55 2017 -0700
----------------------------------------------------------------------
.../samza/system/hdfs/HdfsSystemAdmin.java | 29 +++++++++++++++-
.../samza/system/hdfs/HdfsSystemConsumer.java | 7 ++++
.../hdfs/partitioner/DirectoryPartitioner.java | 7 ++--
.../system/hdfs/TestHdfsSystemConsumer.java | 36 +++++++++++++++++++-
4 files changed, 75 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/6726e1d1/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemAdmin.java
----------------------------------------------------------------------
diff --git a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemAdmin.java b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemAdmin.java
index 8bf31c5..f5b05fb 100644
--- a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemAdmin.java
+++ b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemAdmin.java
@@ -21,6 +21,7 @@ package org.apache.samza.system.hdfs;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -116,10 +117,17 @@ public class HdfsSystemAdmin implements SystemAdmin {
}
static Map<Partition, List<String>> obtainPartitionDescriptorMap(String stagingDirectory, String streamName) {
+ if (StringUtils.isBlank(stagingDirectory)) {
+ LOG.info("Empty or null staging directory: {}", stagingDirectory);
+ return Collections.emptyMap();
+ }
+ if (StringUtils.isBlank(streamName)) {
+ throw new SamzaException(String.format("stream name (%s) is null or empty!", streamName));
+ }
Path path = PartitionDescriptorUtil.getPartitionDescriptorPath(stagingDirectory, streamName);
try (FileSystem fs = path.getFileSystem(new Configuration())) {
if (!fs.exists(path)) {
- return null;
+ return Collections.emptyMap();
}
try (FSDataInputStream fis = fs.open(path)) {
String json = IOUtils.toString(fis, StandardCharsets.UTF_8);
@@ -135,6 +143,10 @@ public class HdfsSystemAdmin implements SystemAdmin {
*/
private void persistPartitionDescriptor(String streamName,
Map<Partition, List<String>> partitionDescriptorMap) {
+ if (StringUtils.isBlank(stagingDirectory) || StringUtils.isBlank(streamName)) {
+ LOG.warn("Staging directory ({}) or stream name ({}) is empty", stagingDirectory, streamName);
+ return;
+ }
Path targetPath = PartitionDescriptorUtil.getPartitionDescriptorPath(stagingDirectory, streamName);
try (FileSystem fs = targetPath.getFileSystem(new Configuration())) {
// Partition descriptor is supposed to be immutable. So don't override it if it exists.
@@ -153,6 +165,10 @@ public class HdfsSystemAdmin implements SystemAdmin {
}
private boolean partitionDescriptorExists(String streamName) {
+ if (StringUtils.isBlank(stagingDirectory) || StringUtils.isBlank(streamName)) {
+ LOG.warn("Staging directory ({}) or stream name ({}) is empty", stagingDirectory, streamName);
+ return false;
+ }
Path targetPath = PartitionDescriptorUtil.getPartitionDescriptorPath(stagingDirectory, streamName);
try (FileSystem fs = targetPath.getFileSystem(new Configuration())) {
return fs.exists(targetPath);
@@ -161,6 +177,17 @@ public class HdfsSystemAdmin implements SystemAdmin {
}
}
+ /**
+ *
+ * Fetch metadata from hdfs system for a set of streams. This has the potential side effect
+ * to persist partition description to the staging directory on hdfs if staging directory
+ * is not empty. See getStagingDirectory on {@link HdfsConfig}
+ *
+ * @param streamNames
+ * The streams to to fetch metadata for.
+ * @return A map from stream name to SystemStreamMetadata for each stream
+ * requested in the parameter set.
+ */
@Override
public Map<String, SystemStreamMetadata> getSystemStreamMetadata(Set<String> streamNames) {
Map<String, SystemStreamMetadata> systemStreamMetadataMap = new HashMap<>();
http://git-wip-us.apache.org/repos/asf/samza/blob/6726e1d1/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemConsumer.java
----------------------------------------------------------------------
diff --git a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemConsumer.java b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemConsumer.java
index 13a7102..fb9bb56 100644
--- a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemConsumer.java
+++ b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemConsumer.java
@@ -31,6 +31,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
+import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.Validate;
import org.apache.samza.Partition;
import org.apache.samza.SamzaException;
@@ -132,6 +133,12 @@ public class HdfsSystemConsumer extends BlockingEnvelopeMap {
public Map<Partition, List<String>> load(String streamName)
throws Exception {
Validate.notEmpty(streamName);
+ if (StringUtils.isBlank(stagingDirectory)) {
+ throw new SamzaException("Staging directory can't be empty. "
+ + "Is this not a yarn job (currently hdfs system consumer only works in "
+ + "the same yarn environment on which hdfs is running)? " + "Is STAGING_DIRECTORY ("
+ + HdfsConfig.STAGING_DIRECTORY() + ") not set (see HdfsConfig.scala)?");
+ }
return HdfsSystemAdmin.obtainPartitionDescriptorMap(stagingDirectory, streamName);
}
});
http://git-wip-us.apache.org/repos/asf/samza/blob/6726e1d1/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/partitioner/DirectoryPartitioner.java
----------------------------------------------------------------------
diff --git a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/partitioner/DirectoryPartitioner.java b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/partitioner/DirectoryPartitioner.java
index 5cad1e4..0661139 100644
--- a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/partitioner/DirectoryPartitioner.java
+++ b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/partitioner/DirectoryPartitioner.java
@@ -20,6 +20,7 @@
package org.apache.samza.system.hdfs.partitioner;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
@@ -32,6 +33,8 @@ import java.util.regex.Pattern;
import javax.annotation.Nullable;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.samza.Partition;
import org.apache.samza.SamzaException;
@@ -192,12 +195,12 @@ public class DirectoryPartitioner {
public Map<Partition, SystemStreamPartitionMetadata> getPartitionMetadataMap(String streamName,
@Nullable Map<Partition, List<String>> existingPartitionDescriptorMap) {
LOG.info("Trying to obtain metadata for " + streamName);
- LOG.info("Existing partition descriptor: " + (existingPartitionDescriptorMap == null ? "empty"
+ LOG.info("Existing partition descriptor: " + (MapUtils.isEmpty(existingPartitionDescriptorMap) ? "empty"
: existingPartitionDescriptorMap));
Map<Partition, SystemStreamPartitionMetadata> partitionMetadataMap = new HashMap<>();
partitionDescriptorMap.putIfAbsent(streamName, new HashMap<>());
List<FileMetadata> filteredFiles = getFilteredFiles(streamName);
- if (existingPartitionDescriptorMap != null) {
+ if (!MapUtils.isEmpty(existingPartitionDescriptorMap)) {
filteredFiles = validateAndGetOriginalFilteredFiles(filteredFiles, existingPartitionDescriptorMap);
}
List<List<FileMetadata>> groupedPartitions = generatePartitionGroups(filteredFiles);
http://git-wip-us.apache.org/repos/asf/samza/blob/6726e1d1/samza-hdfs/src/test/java/org/apache/samza/system/hdfs/TestHdfsSystemConsumer.java
----------------------------------------------------------------------
diff --git a/samza-hdfs/src/test/java/org/apache/samza/system/hdfs/TestHdfsSystemConsumer.java b/samza-hdfs/src/test/java/org/apache/samza/system/hdfs/TestHdfsSystemConsumer.java
index ef5ab00..21afcb9 100644
--- a/samza-hdfs/src/test/java/org/apache/samza/system/hdfs/TestHdfsSystemConsumer.java
+++ b/samza-hdfs/src/test/java/org/apache/samza/system/hdfs/TestHdfsSystemConsumer.java
@@ -31,6 +31,7 @@ import java.util.Set;
import org.apache.avro.generic.GenericRecord;
import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
import org.apache.samza.config.MapConfig;
import org.apache.samza.system.IncomingMessageEnvelope;
@@ -41,7 +42,7 @@ import org.apache.samza.util.NoOpMetricsRegistry;
import org.junit.Assert;
import org.junit.Test;
-
+import com.google.common.util.concurrent.UncheckedExecutionException;
public class TestHdfsSystemConsumer {
@@ -133,4 +134,37 @@ public class TestHdfsSystemConsumer {
Assert.assertEquals(messages.get(NUM_EVENTS).getOffset(), IncomingMessageEnvelope.END_OF_STREAM_OFFSET);
});
}
+
+ /*
+ * Ensure that empty staging directory will not break system admin,
+ * but should fail system consumer
+ */
+ @Test
+ public void testEmptyStagingDirectory() throws Exception {
+ Map<String, String> configMap = new HashMap<>();
+ configMap.put(String.format(HdfsConfig.CONSUMER_PARTITIONER_WHITELIST(), SYSTEM_NAME), ".*avro");
+ Config config = new MapConfig(configMap);
+ HdfsSystemFactory systemFactory = new HdfsSystemFactory();
+
+ // create admin and do partitioning
+ HdfsSystemAdmin systemAdmin = systemFactory.getAdmin(SYSTEM_NAME, config);
+ String stream = WORKING_DIRECTORY;
+ Set<String> streamNames = new HashSet<>();
+ streamNames.add(stream);
+ generateAvroDataFiles();
+ Map<String, SystemStreamMetadata> streamMetadataMap = systemAdmin.getSystemStreamMetadata(streamNames);
+ SystemStreamMetadata systemStreamMetadata = streamMetadataMap.get(stream);
+ Assert.assertEquals(NUM_FILES, systemStreamMetadata.getSystemStreamPartitionMetadata().size());
+
+ // create consumer and read from files
+ HdfsSystemConsumer systemConsumer = systemFactory.getConsumer(SYSTEM_NAME, config, new NoOpMetricsRegistry());
+ Partition partition = new Partition(0);
+ SystemStreamPartition ssp = new SystemStreamPartition(SYSTEM_NAME, stream, partition);
+ try {
+ systemConsumer.register(ssp, "0");
+ Assert.fail("Empty staging directory should fail system consumer");
+ } catch (UncheckedExecutionException e) {
+ Assert.assertTrue(e.getCause() instanceof SamzaException);
+ }
+ }
}