You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2018/02/01 15:24:16 UTC
[1/3] storm git commit: [STORM-2916] separate hdfs-blobstore from
storm-hdfs
Repository: storm
Updated Branches:
refs/heads/master 18045a3fc -> d68416b24
http://git-wip-us.apache.org/repos/asf/storm/blob/f58d4729/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStore.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStore.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStore.java
deleted file mode 100644
index 7130153..0000000
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStore.java
+++ /dev/null
@@ -1,395 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.hdfs.blobstore;
-
-import java.io.ByteArrayOutputStream;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.security.AccessController;
-import java.security.PrivilegedAction;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import javax.security.auth.Subject;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.storm.Config;
-import org.apache.storm.blobstore.AtomicOutputStream;
-import org.apache.storm.blobstore.BlobStore;
-import org.apache.storm.blobstore.BlobStoreAclHandler;
-import org.apache.storm.blobstore.BlobStoreFile;
-import org.apache.storm.blobstore.InputStreamWithMeta;
-import org.apache.storm.generated.AuthorizationException;
-import org.apache.storm.generated.KeyAlreadyExistsException;
-import org.apache.storm.generated.KeyNotFoundException;
-import org.apache.storm.generated.ReadableBlobMeta;
-import org.apache.storm.generated.SettableBlobMeta;
-import org.apache.storm.nimbus.NimbusInfo;
-import org.apache.storm.utils.Utils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.apache.storm.blobstore.BlobStoreAclHandler.ADMIN;
-import static org.apache.storm.blobstore.BlobStoreAclHandler.READ;
-import static org.apache.storm.blobstore.BlobStoreAclHandler.WRITE;
-
-/**
- * Provides a HDFS file system backed blob store implementation.
- * Note that this provides an api for having HDFS be the backing store for the blobstore,
- * it is not a service/daemon.
- *
- * We currently have NIMBUS_ADMINS and SUPERVISOR_ADMINS configuration. NIMBUS_ADMINS are given READ, WRITE and ADMIN
- * access whereas the SUPERVISOR_ADMINS are given READ access in order to read and download the blobs form the nimbus.
- *
- * The ACLs for the blob store are validated against whether the subject is a NIMBUS_ADMIN, SUPERVISOR_ADMIN or USER
- * who has read, write or admin privileges in order to perform respective operations on the blob.
- *
- * For hdfs blob store
- * 1. The USER interacts with nimbus to upload and access blobs through NimbusBlobStore Client API. Here, unlike
- * local blob store which stores the blobs locally, the nimbus talks to HDFS to upload the blobs.
- * 2. The USER sets the ACLs, and the blob access is validated against these ACLs.
- * 3. The SUPERVISOR interacts with nimbus through HdfsClientBlobStore to download the blobs. Here, unlike local
- * blob store the supervisor interacts with HDFS directly to download the blobs. The call to HdfsBlobStore is made as a "null"
- * subject. The blobstore gets the hadoop user and validates permissions for the supervisor.
- */
-public class HdfsBlobStore extends BlobStore {
- public static final Logger LOG = LoggerFactory.getLogger(HdfsBlobStore.class);
- private static final String DATA_PREFIX = "data_";
- private static final String META_PREFIX = "meta_";
- private static final HashMap<String, Subject> alreadyLoggedInUsers = new HashMap<>();
-
- private BlobStoreAclHandler aclHandler;
- private HdfsBlobStoreImpl hbs;
- private Subject localSubject;
- private Map<String, Object> conf;
-
- /**
- * Get the subject from Hadoop so we can use it to validate the acls. There is no direct
- * interface from UserGroupInformation to get the subject, so do a doAs and get the context.
- * We could probably run everything in the doAs but for now just grab the subject.
- */
- private Subject getHadoopUser() {
- Subject subj;
- try {
- subj = UserGroupInformation.getCurrentUser().doAs(
- new PrivilegedAction<Subject>() {
- @Override
- public Subject run() {
- return Subject.getSubject(AccessController.getContext());
- }
- });
- } catch (IOException e) {
- throw new RuntimeException("Error creating subject and logging user in!", e);
- }
- return subj;
- }
-
- /**
- * If who is null then we want to use the user hadoop says we are.
- * Required for the supervisor to call these routines as its not
- * logged in as anyone.
- */
- private Subject checkAndGetSubject(Subject who) {
- if (who == null) {
- return localSubject;
- }
- return who;
- }
-
- @Override
- public void prepare(Map<String, Object> conf, String overrideBase, NimbusInfo nimbusInfo) {
- this.conf = conf;
- prepareInternal(conf, overrideBase, null);
- }
-
- /**
- * Allow a Hadoop Configuration to be passed for testing. If it's null then the hadoop configs
- * must be in your classpath.
- */
- protected void prepareInternal(Map<String, Object> conf, String overrideBase, Configuration hadoopConf) {
- this.conf = conf;
- if (overrideBase == null) {
- overrideBase = (String)conf.get(Config.BLOBSTORE_DIR);
- }
- if (overrideBase == null) {
- throw new RuntimeException("You must specify a blobstore directory for HDFS to use!");
- }
- LOG.debug("directory is: {}", overrideBase);
- try {
- // if a HDFS keytab/principal have been supplied login, otherwise assume they are
- // logged in already or running insecure HDFS.
- String principal = (String) conf.get(Config.BLOBSTORE_HDFS_PRINCIPAL);
- String keyTab = (String) conf.get(Config.BLOBSTORE_HDFS_KEYTAB);
-
- if (principal != null && keyTab != null) {
- String combinedKey = principal + " from " + keyTab;
- synchronized (alreadyLoggedInUsers) {
- localSubject = alreadyLoggedInUsers.get(combinedKey);
- if (localSubject == null) {
- UserGroupInformation.loginUserFromKeytab(principal, keyTab);
- localSubject = getHadoopUser();
- alreadyLoggedInUsers.put(combinedKey, localSubject);
- }
- }
- } else {
- if (principal == null && keyTab != null) {
- throw new RuntimeException("You must specify an HDFS principal to go with the keytab!");
-
- } else {
- if (principal != null && keyTab == null) {
- throw new RuntimeException("You must specify HDFS keytab go with the principal!");
- }
- }
- localSubject = getHadoopUser();
- }
- } catch (IOException e) {
- throw new RuntimeException("Error logging in from keytab!", e);
- }
- aclHandler = new BlobStoreAclHandler(conf);
- Path baseDir = new Path(overrideBase, BASE_BLOBS_DIR_NAME);
- try {
- if (hadoopConf != null) {
- hbs = new HdfsBlobStoreImpl(baseDir, conf, hadoopConf);
- } else {
- hbs = new HdfsBlobStoreImpl(baseDir, conf);
- }
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public AtomicOutputStream createBlob(String key, SettableBlobMeta meta, Subject who)
- throws AuthorizationException, KeyAlreadyExistsException {
- if (meta.get_replication_factor() <= 0) {
- meta.set_replication_factor((int)conf.get(Config.STORM_BLOBSTORE_REPLICATION_FACTOR));
- }
- who = checkAndGetSubject(who);
- validateKey(key);
- aclHandler.normalizeSettableBlobMeta(key, meta, who, READ | WRITE | ADMIN);
- BlobStoreAclHandler.validateSettableACLs(key, meta.get_acl());
- aclHandler.hasPermissions(meta.get_acl(), READ | WRITE | ADMIN, who, key);
- if (hbs.exists(DATA_PREFIX + key)) {
- throw new KeyAlreadyExistsException(key);
- }
- BlobStoreFileOutputStream mOut = null;
- try {
- BlobStoreFile metaFile = hbs.write(META_PREFIX + key, true);
- metaFile.setMetadata(meta);
- mOut = new BlobStoreFileOutputStream(metaFile);
- mOut.write(Utils.thriftSerialize(meta));
- mOut.close();
- mOut = null;
- BlobStoreFile dataFile = hbs.write(DATA_PREFIX + key, true);
- dataFile.setMetadata(meta);
- return new BlobStoreFileOutputStream(dataFile);
- } catch (IOException e) {
- throw new RuntimeException(e);
- } finally {
- if (mOut != null) {
- try {
- mOut.cancel();
- } catch (IOException e) {
- //Ignored
- }
- }
- }
- }
-
- @Override
- public AtomicOutputStream updateBlob(String key, Subject who)
- throws AuthorizationException, KeyNotFoundException {
- who = checkAndGetSubject(who);
- SettableBlobMeta meta = getStoredBlobMeta(key);
- validateKey(key);
- aclHandler.hasPermissions(meta.get_acl(), WRITE, who, key);
- try {
- BlobStoreFile dataFile = hbs.write(DATA_PREFIX + key, false);
- dataFile.setMetadata(meta);
- return new BlobStoreFileOutputStream(dataFile);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- private SettableBlobMeta getStoredBlobMeta(String key) throws KeyNotFoundException {
- InputStream in = null;
- try {
- BlobStoreFile pf = hbs.read(META_PREFIX + key);
- try {
- in = pf.getInputStream();
- } catch (FileNotFoundException fnf) {
- throw new KeyNotFoundException(key);
- }
- ByteArrayOutputStream out = new ByteArrayOutputStream();
- byte[] buffer = new byte[2048];
- int len;
- while ((len = in.read(buffer)) > 0) {
- out.write(buffer, 0, len);
- }
- in.close();
- in = null;
- return Utils.thriftDeserialize(SettableBlobMeta.class, out.toByteArray());
- } catch (IOException e) {
- throw new RuntimeException(e);
- } finally {
- if (in != null) {
- try {
- in.close();
- } catch (IOException e) {
- //Ignored
- }
- }
- }
- }
-
- @Override
- public ReadableBlobMeta getBlobMeta(String key, Subject who)
- throws AuthorizationException, KeyNotFoundException {
- who = checkAndGetSubject(who);
- validateKey(key);
- SettableBlobMeta meta = getStoredBlobMeta(key);
- aclHandler.validateUserCanReadMeta(meta.get_acl(), who, key);
- ReadableBlobMeta rbm = new ReadableBlobMeta();
- rbm.set_settable(meta);
- try {
- BlobStoreFile pf = hbs.read(DATA_PREFIX + key);
- rbm.set_version(pf.getModTime());
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- return rbm;
- }
-
- @Override
- public void setBlobMeta(String key, SettableBlobMeta meta, Subject who)
- throws AuthorizationException, KeyNotFoundException {
- if (meta.get_replication_factor() <= 0) {
- meta.set_replication_factor((int)conf.get(Config.STORM_BLOBSTORE_REPLICATION_FACTOR));
- }
- who = checkAndGetSubject(who);
- validateKey(key);
- aclHandler.normalizeSettableBlobMeta(key, meta, who, ADMIN);
- BlobStoreAclHandler.validateSettableACLs(key, meta.get_acl());
- SettableBlobMeta orig = getStoredBlobMeta(key);
- aclHandler.hasPermissions(orig.get_acl(), ADMIN, who, key);
- BlobStoreFileOutputStream mOut = null;
- writeMetadata(key, meta);
- }
-
- @Override
- public void deleteBlob(String key, Subject who)
- throws AuthorizationException, KeyNotFoundException {
- who = checkAndGetSubject(who);
- validateKey(key);
- SettableBlobMeta meta = getStoredBlobMeta(key);
- aclHandler.hasPermissions(meta.get_acl(), WRITE, who, key);
- try {
- hbs.deleteKey(DATA_PREFIX + key);
- hbs.deleteKey(META_PREFIX + key);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public InputStreamWithMeta getBlob(String key, Subject who)
- throws AuthorizationException, KeyNotFoundException {
- who = checkAndGetSubject(who);
- validateKey(key);
- SettableBlobMeta meta = getStoredBlobMeta(key);
- aclHandler.hasPermissions(meta.get_acl(), READ, who, key);
- try {
- return new BlobStoreFileInputStream(hbs.read(DATA_PREFIX + key));
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public Iterator<String> listKeys() {
- try {
- return new KeyTranslationIterator(hbs.listKeys(), DATA_PREFIX);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public void shutdown() {
- //Empty
- }
-
- @Override
- public int getBlobReplication(String key, Subject who) throws AuthorizationException, KeyNotFoundException {
- who = checkAndGetSubject(who);
- validateKey(key);
- SettableBlobMeta meta = getStoredBlobMeta(key);
- aclHandler.hasAnyPermissions(meta.get_acl(), READ | WRITE | ADMIN, who, key);
- try {
- return hbs.getBlobReplication(DATA_PREFIX + key);
- } catch (IOException exp) {
- throw new RuntimeException(exp);
- }
- }
-
- @Override
- public int updateBlobReplication(String key, int replication, Subject who) throws AuthorizationException, KeyNotFoundException {
- who = checkAndGetSubject(who);
- validateKey(key);
- SettableBlobMeta meta = getStoredBlobMeta(key);
- meta.set_replication_factor(replication);
- aclHandler.hasAnyPermissions(meta.get_acl(), WRITE | ADMIN, who, key);
- try {
- writeMetadata(key, meta);
- return hbs.updateBlobReplication(DATA_PREFIX + key, replication);
- } catch (IOException exp) {
- throw new RuntimeException(exp);
- }
- }
-
- public void writeMetadata(String key, SettableBlobMeta meta)
- throws AuthorizationException, KeyNotFoundException {
- BlobStoreFileOutputStream mOut = null;
- try {
- BlobStoreFile hdfsFile = hbs.write(META_PREFIX + key, false);
- hdfsFile.setMetadata(meta);
- mOut = new BlobStoreFileOutputStream(hdfsFile);
- mOut.write(Utils.thriftSerialize(meta));
- mOut.close();
- mOut = null;
- } catch (IOException exp) {
- throw new RuntimeException(exp);
- } finally {
- if (mOut != null) {
- try {
- mOut.cancel();
- } catch (IOException e) {
- //Ignored
- }
- }
- }
- }
-
- public void fullCleanup(long age) throws IOException {
- hbs.fullCleanup(age);
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/f58d4729/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreFile.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreFile.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreFile.java
deleted file mode 100644
index 0192d94..0000000
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreFile.java
+++ /dev/null
@@ -1,196 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.hdfs.blobstore;
-
-import org.apache.storm.blobstore.BlobStoreFile;
-import org.apache.storm.generated.SettableBlobMeta;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileContext;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Options;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.regex.Matcher;
-
-public class HdfsBlobStoreFile extends BlobStoreFile {
- public static final Logger LOG = LoggerFactory.getLogger(HdfsBlobStoreFile.class);
-
- private final String _key;
- private final boolean _isTmp;
- private final Path _path;
- private Long _modTime = null;
- private final boolean _mustBeNew;
- private final Configuration _hadoopConf;
- private final FileSystem _fs;
- private SettableBlobMeta meta;
-
- // files are world-wide readable and owner writable
- final public static FsPermission BLOBSTORE_FILE_PERMISSION =
- FsPermission.createImmutable((short) 0644); // rw-r--r--
-
- public HdfsBlobStoreFile(Path base, String name, Configuration hconf) {
- if (BLOBSTORE_DATA_FILE.equals(name)) {
- _isTmp = false;
- } else {
- Matcher m = TMP_NAME_PATTERN.matcher(name);
- if (!m.matches()) {
- throw new IllegalArgumentException("File name does not match '"+name+"' !~ "+TMP_NAME_PATTERN);
- }
- _isTmp = true;
- }
- _hadoopConf = hconf;
- _key = base.getName();
- _path = new Path(base, name);
- _mustBeNew = false;
- try {
- _fs = _path.getFileSystem(_hadoopConf);
- } catch (IOException e) {
- throw new RuntimeException("Error getting filesystem for path: " + _path, e);
- }
- }
-
- public HdfsBlobStoreFile(Path base, boolean isTmp, boolean mustBeNew, Configuration hconf) {
- _key = base.getName();
- _hadoopConf = hconf;
- _isTmp = isTmp;
- _mustBeNew = mustBeNew;
- if (_isTmp) {
- _path = new Path(base, System.currentTimeMillis()+TMP_EXT);
- } else {
- _path = new Path(base, BLOBSTORE_DATA_FILE);
- }
- try {
- _fs = _path.getFileSystem(_hadoopConf);
- } catch (IOException e) {
- throw new RuntimeException("Error getting filesystem for path: " + _path, e);
- }
- }
-
- @Override
- public void delete() throws IOException {
- _fs.delete(_path, true);
- }
-
- @Override
- public boolean isTmp() {
- return _isTmp;
- }
-
- @Override
- public String getKey() {
- return _key;
- }
-
- @Override
- public long getModTime() throws IOException {
- if (_modTime == null) {
- FileSystem fs = _path.getFileSystem(_hadoopConf);
- _modTime = fs.getFileStatus(_path).getModificationTime();
- }
- return _modTime;
- }
-
- private void checkIsNotTmp() {
- if (!isTmp()) {
- throw new IllegalStateException("Can only operate on a temporary blobstore file.");
- }
- }
-
- private void checkIsTmp() {
- if (isTmp()) {
- throw new IllegalStateException("Cannot operate on a temporary blobstore file.");
- }
- }
-
- @Override
- public InputStream getInputStream() throws IOException {
- checkIsTmp();
- return _fs.open(_path);
- }
-
- @Override
- public OutputStream getOutputStream() throws IOException {
- checkIsNotTmp();
- OutputStream out = null;
- FsPermission fileperms = new FsPermission(BLOBSTORE_FILE_PERMISSION);
- try {
- out = _fs.create(_path, (short)this.getMetadata().get_replication_factor());
- _fs.setPermission(_path, fileperms);
- _fs.setReplication(_path, (short)this.getMetadata().get_replication_factor());
- } catch (IOException e) {
- //Try to create the parent directory, may not work
- FsPermission dirperms = new FsPermission(HdfsBlobStoreImpl.BLOBSTORE_DIR_PERMISSION);
- if (!_fs.mkdirs(_path.getParent(), dirperms)) {
- LOG.warn("error creating parent dir: " + _path.getParent());
- }
- out = _fs.create(_path, (short)this.getMetadata().get_replication_factor());
- _fs.setPermission(_path, dirperms);
- _fs.setReplication(_path, (short)this.getMetadata().get_replication_factor());
- }
- if (out == null) {
- throw new IOException("Error in creating: " + _path);
- }
- return out;
- }
-
- @Override
- public void commit() throws IOException {
- checkIsNotTmp();
- // FileContext supports atomic rename, whereas FileSystem doesn't
- FileContext fc = FileContext.getFileContext(_hadoopConf);
- Path dest = new Path(_path.getParent(), BLOBSTORE_DATA_FILE);
- if (_mustBeNew) {
- fc.rename(_path, dest);
- } else {
- fc.rename(_path, dest, Options.Rename.OVERWRITE);
- }
- // Note, we could add support for setting the replication factor
- }
-
- @Override
- public void cancel() throws IOException {
- checkIsNotTmp();
- delete();
- }
-
- @Override
- public String toString() {
- return _path+":"+(_isTmp ? "tmp": BlobStoreFile.BLOBSTORE_DATA_FILE)+":"+_key;
- }
-
- @Override
- public long getFileLength() throws IOException {
- return _fs.getFileStatus(_path).getLen();
- }
-
- @Override
- public SettableBlobMeta getMetadata() {
- return meta;
- }
-
- @Override
- public void setMetadata(SettableBlobMeta meta) {
- this.meta = meta;
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/f58d4729/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreImpl.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreImpl.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreImpl.java
deleted file mode 100644
index eb9d0b8..0000000
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreImpl.java
+++ /dev/null
@@ -1,312 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.hdfs.blobstore;
-
-import org.apache.storm.Config;
-import org.apache.storm.blobstore.BlobStoreFile;
-import org.apache.storm.utils.ObjectReader;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.NoSuchElementException;
-import java.util.Timer;
-import java.util.TimerTask;
-
-/**
- * HDFS blob store impl.
- */
-public class HdfsBlobStoreImpl {
- private static final Logger LOG = LoggerFactory.getLogger(HdfsBlobStoreImpl.class);
-
- private static final long FULL_CLEANUP_FREQ = 60 * 60 * 1000l;
- private static final int BUCKETS = 1024;
- private static final String BLOBSTORE_DATA = "data";
-
- private Timer timer;
-
- public class KeyInHashDirIterator implements Iterator<String> {
- private int currentBucket = 0;
- private Iterator<String> it = null;
- private String next = null;
-
- public KeyInHashDirIterator() throws IOException {
- primeNext();
- }
-
- private void primeNext() throws IOException {
- while (it == null && currentBucket < BUCKETS) {
- String name = String.valueOf(currentBucket);
- Path dir = new Path(_fullPath, name);
- try {
- it = listKeys(dir);
- } catch (FileNotFoundException e) {
- it = null;
- }
- if (it == null || !it.hasNext()) {
- it = null;
- currentBucket++;
- } else {
- next = it.next();
- }
- }
- }
-
- @Override
- public boolean hasNext() {
- return next != null;
- }
-
- @Override
- public String next() {
- if (!hasNext()) {
- throw new NoSuchElementException();
- }
- String current = next;
- next = null;
- if (it != null) {
- if (!it.hasNext()) {
- it = null;
- currentBucket++;
- try {
- primeNext();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- } else {
- next = it.next();
- }
- }
- return current;
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException("Delete Not Supported");
- }
- }
-
-
- private Path _fullPath;
- private FileSystem _fs;
- private Configuration _hadoopConf;
-
- // blobstore directory is private!
- final public static FsPermission BLOBSTORE_DIR_PERMISSION =
- FsPermission.createImmutable((short) 0700); // rwx--------
-
- public HdfsBlobStoreImpl(Path path, Map<String, Object> conf) throws IOException {
- this(path, conf, new Configuration());
- }
-
- public HdfsBlobStoreImpl(Path path, Map<String, Object> conf,
- Configuration hconf) throws IOException {
- LOG.info("Blob store based in {}", path);
- _fullPath = path;
- _hadoopConf = hconf;
- _fs = path.getFileSystem(_hadoopConf);
-
- if (!_fs.exists(_fullPath)) {
- FsPermission perms = new FsPermission(BLOBSTORE_DIR_PERMISSION);
- boolean success = _fs.mkdirs(_fullPath, perms);
- if (!success) {
- throw new IOException("Error creating blobstore directory: " + _fullPath);
- }
- }
-
- Object shouldCleanup = conf.get(Config.BLOBSTORE_CLEANUP_ENABLE);
- if (ObjectReader.getBoolean(shouldCleanup, false)) {
- LOG.debug("Starting hdfs blobstore cleaner");
- TimerTask cleanup = new TimerTask() {
- @Override
- public void run() {
- try {
- fullCleanup(FULL_CLEANUP_FREQ);
- } catch (IOException e) {
- LOG.error("Error trying to cleanup", e);
- }
- }
- };
- timer = new Timer("HdfsBlobStore cleanup thread", true);
- timer.scheduleAtFixedRate(cleanup, 0, FULL_CLEANUP_FREQ);
- }
- }
-
- /**
- * @return all keys that are available for reading.
- * @throws IOException on any error.
- */
- public Iterator<String> listKeys() throws IOException {
- return new KeyInHashDirIterator();
- }
-
- /**
- * Get an input stream for reading a part.
- *
- * @param key the key of the part to read.
- * @return the where to read the data from.
- * @throws IOException on any error
- */
- public BlobStoreFile read(String key) throws IOException {
- return new HdfsBlobStoreFile(getKeyDir(key), BLOBSTORE_DATA, _hadoopConf);
- }
-
- /**
- * Get an object tied to writing the data.
- *
- * @param key the key of the part to write to.
- * @param create whether the file needs to be new or not.
- * @return an object that can be used to both write to, but also commit/cancel the operation.
- * @throws IOException on any error
- */
- public BlobStoreFile write(String key, boolean create) throws IOException {
- return new HdfsBlobStoreFile(getKeyDir(key), true, create, _hadoopConf);
- }
-
- /**
- * Check if the key exists in the blob store.
- *
- * @param key the key to check for
- * @return true if it exists else false.
- */
- public boolean exists(String key) {
- Path dir = getKeyDir(key);
- boolean res = false;
- try {
- _fs = dir.getFileSystem(_hadoopConf);
- res = _fs.exists(dir);
- } catch (IOException e) {
- LOG.warn("Exception checking for exists on: " + key);
- }
- return res;
- }
-
- /**
- * Delete a key from the blob store
- *
- * @param key the key to delete
- * @throws IOException on any error
- */
- public void deleteKey(String key) throws IOException {
- Path keyDir = getKeyDir(key);
- HdfsBlobStoreFile pf = new HdfsBlobStoreFile(keyDir, BLOBSTORE_DATA,
- _hadoopConf);
- pf.delete();
- delete(keyDir);
- }
-
- protected Path getKeyDir(String key) {
- String hash = String.valueOf(Math.abs((long) key.hashCode()) % BUCKETS);
- Path hashDir = new Path(_fullPath, hash);
-
- Path ret = new Path(hashDir, key);
- LOG.debug("{} Looking for {} in {}", new Object[]{_fullPath, key, hash});
- return ret;
- }
-
- public void fullCleanup(long age) throws IOException {
- long cleanUpIfBefore = System.currentTimeMillis() - age;
- Iterator<String> keys = new KeyInHashDirIterator();
- while (keys.hasNext()) {
- String key = keys.next();
- Path keyDir = getKeyDir(key);
- Iterator<BlobStoreFile> i = listBlobStoreFiles(keyDir);
- if (!i.hasNext()) {
- //The dir is empty, so try to delete it, may fail, but that is OK
- try {
- _fs.delete(keyDir, true);
- } catch (Exception e) {
- LOG.warn("Could not delete " + keyDir + " will try again later");
- }
- }
- while (i.hasNext()) {
- BlobStoreFile f = i.next();
- if (f.isTmp()) {
- if (f.getModTime() <= cleanUpIfBefore) {
- f.delete();
- }
- }
- }
- }
- }
-
- protected Iterator<BlobStoreFile> listBlobStoreFiles(Path path) throws IOException {
- ArrayList<BlobStoreFile> ret = new ArrayList<BlobStoreFile>();
- FileStatus[] files = _fs.listStatus(new Path[]{path});
- if (files != null) {
- for (FileStatus sub : files) {
- try {
- ret.add(new HdfsBlobStoreFile(sub.getPath().getParent(), sub.getPath().getName(),
- _hadoopConf));
- } catch (IllegalArgumentException e) {
- //Ignored the file did not match
- LOG.warn("Found an unexpected file in {} {}", path, sub.getPath().getName());
- }
- }
- }
- return ret.iterator();
- }
-
- protected Iterator<String> listKeys(Path path) throws IOException {
- ArrayList<String> ret = new ArrayList<String>();
- FileStatus[] files = _fs.listStatus(new Path[]{path});
- if (files != null) {
- for (FileStatus sub : files) {
- try {
- ret.add(sub.getPath().getName().toString());
- } catch (IllegalArgumentException e) {
- //Ignored the file did not match
- LOG.debug("Found an unexpected file in {} {}", path, sub.getPath().getName());
- }
- }
- }
- return ret.iterator();
- }
-
- protected int getBlobReplication(String key) throws IOException {
- Path path = getKeyDir(key);
- Path dest = new Path(path, BLOBSTORE_DATA);
- return _fs.getFileStatus(dest).getReplication();
- }
-
- protected int updateBlobReplication(String key, int replication) throws IOException {
- Path path = getKeyDir(key);
- Path dest = new Path(path, BLOBSTORE_DATA);
- _fs.setReplication(dest, (short) replication);
- return _fs.getFileStatus(dest).getReplication();
- }
-
- protected void delete(Path path) throws IOException {
- _fs.delete(path, true);
- }
-
- public void shutdown() {
- if (timer != null) {
- timer.cancel();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/f58d4729/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/blobstore/HdfsClientBlobStore.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/blobstore/HdfsClientBlobStore.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/blobstore/HdfsClientBlobStore.java
deleted file mode 100644
index e68d5e3..0000000
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/blobstore/HdfsClientBlobStore.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.hdfs.blobstore;
-
-import org.apache.storm.blobstore.AtomicOutputStream;
-import org.apache.storm.blobstore.ClientBlobStore;
-import org.apache.storm.blobstore.InputStreamWithMeta;
-import org.apache.storm.generated.AuthorizationException;
-import org.apache.storm.generated.ReadableBlobMeta;
-import org.apache.storm.generated.SettableBlobMeta;
-import org.apache.storm.generated.KeyAlreadyExistsException;
-import org.apache.storm.generated.KeyNotFoundException;
-import org.apache.storm.utils.NimbusClient;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Iterator;
-import java.util.Map;
-
-/**
- * Client to access the HDFS blobStore. At this point, this is meant to only be used by the
- * supervisor. Don't trust who the client says they are so pass null for all Subjects.
- *
- * The HdfsBlobStore implementation takes care of the null Subjects. It assigns Subjects
- * based on what hadoop says who the users are. These users must be configured accordingly
- * in the SUPERVISOR_ADMINS for ACL validation and for the supervisors to download the blobs.
- * This API is only used by the supervisor in order to talk directly to HDFS.
- */
-public class HdfsClientBlobStore extends ClientBlobStore {
- private static final Logger LOG = LoggerFactory.getLogger(HdfsClientBlobStore.class);
- private HdfsBlobStore _blobStore;
- private Map _conf;
- private NimbusClient client;
-
- @Override
- public void prepare(Map<String, Object> conf) {
- this._conf = conf;
- _blobStore = new HdfsBlobStore();
- _blobStore.prepare(conf, null, null);
- }
-
- @Override
- public AtomicOutputStream createBlobToExtend(String key, SettableBlobMeta meta)
- throws AuthorizationException, KeyAlreadyExistsException {
- return _blobStore.createBlob(key, meta, null);
- }
-
- @Override
- public AtomicOutputStream updateBlob(String key)
- throws AuthorizationException, KeyNotFoundException {
- return _blobStore.updateBlob(key, null);
- }
-
- @Override
- public ReadableBlobMeta getBlobMeta(String key)
- throws AuthorizationException, KeyNotFoundException {
- return _blobStore.getBlobMeta(key, null);
- }
-
- @Override
- public void setBlobMetaToExtend(String key, SettableBlobMeta meta)
- throws AuthorizationException, KeyNotFoundException {
- _blobStore.setBlobMeta(key, meta, null);
- }
-
- @Override
- public void deleteBlob(String key) throws AuthorizationException, KeyNotFoundException {
- _blobStore.deleteBlob(key, null);
- }
-
- @Override
- public InputStreamWithMeta getBlob(String key)
- throws AuthorizationException, KeyNotFoundException {
- return _blobStore.getBlob(key, null);
- }
-
- @Override
- public Iterator<String> listKeys() {
- return _blobStore.listKeys();
- }
-
- @Override
- public int getBlobReplication(String key) throws AuthorizationException, KeyNotFoundException {
- return _blobStore.getBlobReplication(key, null);
- }
-
- @Override
- public int updateBlobReplication(String key, int replication) throws AuthorizationException, KeyNotFoundException {
- return _blobStore.updateBlobReplication(key, replication, null);
- }
-
- @Override
- public boolean setClient(Map<String, Object> conf, NimbusClient client) {
- this.client = client;
- return true;
- }
-
- @Override
- public void createStateInZookeeper(String key) {
- // Do nothing
- }
-
- @Override
- public void shutdown() {
- close();
- }
-
- @Override
- public void close() {
- if(client != null) {
- client.close();
- client = null;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/f58d4729/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/blobstore/BlobStoreTest.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/blobstore/BlobStoreTest.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/blobstore/BlobStoreTest.java
deleted file mode 100644
index 19ff38c..0000000
--- a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/blobstore/BlobStoreTest.java
+++ /dev/null
@@ -1,544 +0,0 @@
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.hdfs.blobstore;
-
-import org.apache.storm.Config;
-import org.apache.storm.blobstore.AtomicOutputStream;
-import org.apache.storm.blobstore.BlobStore;
-import org.apache.storm.blobstore.BlobStoreAclHandler;
-import org.apache.storm.generated.AccessControl;
-import org.apache.storm.generated.AuthorizationException;
-import org.apache.storm.generated.KeyNotFoundException;
-import org.apache.storm.generated.SettableBlobMeta;
-import org.apache.storm.generated.AccessControlType;
-
-import org.apache.storm.security.auth.FixedGroupsMapping;
-import org.apache.storm.security.auth.NimbusPrincipal;
-import org.apache.storm.security.auth.SingleUserPrincipal;
-import org.apache.commons.io.FileUtils;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.security.auth.Subject;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.URI;
-import java.util.Map;
-import java.util.HashMap;
-import java.util.UUID;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.Iterator;
-import java.util.Arrays;
-import java.util.List;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import org.apache.storm.hdfs.testing.MiniDFSClusterRule;
-import org.junit.ClassRule;
-
-public class BlobStoreTest {
-
- @ClassRule
- public static final MiniDFSClusterRule DFS_CLUSTER_RULE = new MiniDFSClusterRule();
-
- private static final Logger LOG = LoggerFactory.getLogger(BlobStoreTest.class);
- URI base;
- File baseFile;
- private static final Map<String, Object> CONF = new HashMap<>();
- public static final int READ = 0x01;
- public static final int WRITE = 0x02;
- public static final int ADMIN = 0x04;
-
- @Before
- public void init() {
- initializeConfigs();
- baseFile = new File("/tmp/blob-store-test-" + UUID.randomUUID());
- base = baseFile.toURI();
- }
-
- @After
- public void cleanup()
- throws IOException {
- FileUtils.deleteDirectory(baseFile);
- }
-
- // Method which initializes nimbus admin
- public static void initializeConfigs() {
- CONF.put(Config.NIMBUS_ADMINS, "admin");
- CONF.put(Config.NIMBUS_ADMINS_GROUPS, "adminsGroup");
-
- // Construct a groups mapping for the FixedGroupsMapping class
- Map<String, Set<String>> groupsMapping = new HashMap<>();
- Set<String> groupSet = new HashSet<>();
- groupSet.add("adminsGroup");
- groupsMapping.put("adminsGroupsUser", groupSet);
-
- // Now create a params map to put it in to our conf
- Map<String, Object> paramMap = new HashMap<>();
- paramMap.put(FixedGroupsMapping.STORM_FIXED_GROUP_MAPPING, groupsMapping);
- CONF.put(Config.STORM_GROUP_MAPPING_SERVICE_PROVIDER_PLUGIN, "org.apache.storm.security.auth.FixedGroupsMapping");
- CONF.put(Config.STORM_GROUP_MAPPING_SERVICE_PARAMS, paramMap);
- CONF.put(Config.NIMBUS_SUPERVISOR_USERS, "supervisor");
- }
-
- //Gets Nimbus Subject with NimbusPrincipal set on it
- public static Subject getNimbusSubject() {
- Subject nimbus = new Subject();
- nimbus.getPrincipals().add(new NimbusPrincipal());
- return nimbus;
- }
-
- // Overloading the assertStoreHasExactly method accomodate Subject in order to check for authorization
- public static void assertStoreHasExactly(BlobStore store, Subject who, String... keys)
- throws IOException, KeyNotFoundException, AuthorizationException {
- Set<String> expected = new HashSet<>(Arrays.asList(keys));
- Set<String> found = new HashSet<>();
- Iterator<String> c = store.listKeys();
- while (c.hasNext()) {
- String keyName = c.next();
- found.add(keyName);
- }
- Set<String> extra = new HashSet<>(found);
- extra.removeAll(expected);
- assertTrue("Found extra keys in the blob store " + extra, extra.isEmpty());
- Set<String> missing = new HashSet<>(expected);
- missing.removeAll(found);
- assertTrue("Found keys missing from the blob store " + missing, missing.isEmpty());
- }
-
- public static void assertStoreHasExactly(BlobStore store, String... keys)
- throws IOException, KeyNotFoundException, AuthorizationException {
- assertStoreHasExactly(store, null, keys);
- }
-
- // Overloading the readInt method accomodate Subject in order to check for authorization (security turned on)
- public static int readInt(BlobStore store, Subject who, String key) throws IOException, KeyNotFoundException, AuthorizationException {
- try (InputStream in = store.getBlob(key, who)) {
- return in.read();
- }
- }
-
- public static int readInt(BlobStore store, String key)
- throws IOException, KeyNotFoundException, AuthorizationException {
- return readInt(store, null, key);
- }
-
- public static void readAssertEquals(BlobStore store, String key, int value)
- throws IOException, KeyNotFoundException, AuthorizationException {
- assertEquals(value, readInt(store, key));
- }
-
- // Checks for assertion when we turn on security
- public void readAssertEqualsWithAuth(BlobStore store, Subject who, String key, int value)
- throws IOException, KeyNotFoundException, AuthorizationException {
- assertEquals(value, readInt(store, who, key));
- }
-
- private AutoCloseableBlobStoreContainer initHdfs(String dirName)
- throws Exception {
- Map<String, Object> conf = new HashMap<>();
- conf.put(Config.BLOBSTORE_DIR, dirName);
- conf.put(Config.STORM_PRINCIPAL_TO_LOCAL_PLUGIN, "org.apache.storm.security.auth.DefaultPrincipalToLocal");
- conf.put(Config.STORM_BLOBSTORE_REPLICATION_FACTOR, 3);
- HdfsBlobStore store = new HdfsBlobStore();
- store.prepareInternal(conf, null, DFS_CLUSTER_RULE.getDfscluster().getConfiguration(0));
- return new AutoCloseableBlobStoreContainer(store);
- }
-
- private static class AutoCloseableBlobStoreContainer implements AutoCloseable {
-
- private final HdfsBlobStore blobStore;
-
- public AutoCloseableBlobStoreContainer(HdfsBlobStore blobStore) {
- this.blobStore = blobStore;
- }
-
- @Override
- public void close() throws Exception {
- this.blobStore.shutdown();
- }
-
- }
-
- @Test
- public void testHdfsReplication()
- throws Exception {
- try (AutoCloseableBlobStoreContainer container = initHdfs("/storm/blobstoreReplication")) {
- testReplication("/storm/blobstoreReplication/test", container.blobStore);
- }
- }
-
- @Test
- public void testBasicHdfs()
- throws Exception {
- try (AutoCloseableBlobStoreContainer container = initHdfs("/storm/blobstore1")) {
- testBasic(container.blobStore);
- }
- }
-
- @Test
- public void testMultipleHdfs()
- throws Exception {
- // use different blobstore dir so it doesn't conflict with other test
- try (AutoCloseableBlobStoreContainer container = initHdfs("/storm/blobstore2")) {
- testMultiple(container.blobStore);
- }
- }
-
- @Test
- public void testHdfsWithAuth()
- throws Exception {
- // use different blobstore dir so it doesn't conflict with other tests
- try (AutoCloseableBlobStoreContainer container = initHdfs("/storm/blobstore3")) {
- testWithAuthentication(container.blobStore);
- }
- }
-
- // Test for replication.
- public void testReplication(String path, BlobStore store)
- throws Exception {
- SettableBlobMeta metadata = new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING);
- metadata.set_replication_factor(4);
- try (AtomicOutputStream out = store.createBlob("test", metadata, null)) {
- out.write(1);
- }
- assertStoreHasExactly(store, "test");
- assertEquals("Blobstore replication not matching", store.getBlobReplication("test", null), 4);
- store.deleteBlob("test", null);
-
- //Test for replication with NIMBUS as user
- Subject admin = getSubject("admin");
- metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT);
- metadata.set_replication_factor(4);
- try (AtomicOutputStream out = store.createBlob("test", metadata, admin)) {
- out.write(1);
- }
- assertStoreHasExactly(store, "test");
- assertEquals("Blobstore replication not matching", store.getBlobReplication("test", admin), 4);
- store.updateBlobReplication("test", 5, admin);
- assertEquals("Blobstore replication not matching", store.getBlobReplication("test", admin), 5);
- store.deleteBlob("test", admin);
-
- //Test for replication using SUPERVISOR access
- Subject supervisor = getSubject("supervisor");
- metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT);
- metadata.set_replication_factor(4);
- try (AtomicOutputStream out = store.createBlob("test", metadata, supervisor)) {
- out.write(1);
- }
- assertStoreHasExactly(store, "test");
- assertEquals("Blobstore replication not matching", store.getBlobReplication("test", supervisor), 4);
- store.updateBlobReplication("test", 5, supervisor);
- assertEquals("Blobstore replication not matching", store.getBlobReplication("test", supervisor), 5);
- store.deleteBlob("test", supervisor);
-
- Subject adminsGroupsUser = getSubject("adminsGroupsUser");
- metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT);
- metadata.set_replication_factor(4);
- try (AtomicOutputStream out = store.createBlob("test", metadata, adminsGroupsUser)) {
- out.write(1);
- }
- assertStoreHasExactly(store, "test");
- assertEquals("Blobstore replication not matching", store.getBlobReplication("test", adminsGroupsUser), 4);
- store.updateBlobReplication("test", 5, adminsGroupsUser);
- assertEquals("Blobstore replication not matching", store.getBlobReplication("test", adminsGroupsUser), 5);
- store.deleteBlob("test", adminsGroupsUser);
-
- //Test for a user having read or write or admin access to read replication for a blob
- String createSubject = "createSubject";
- String writeSubject = "writeSubject";
- String adminSubject = "adminSubject";
- Subject who = getSubject(createSubject);
- AccessControl writeAccess = new AccessControl(AccessControlType.USER, READ);
- AccessControl adminAccess = new AccessControl(AccessControlType.USER, ADMIN);
- writeAccess.set_name(writeSubject);
- adminAccess.set_name(adminSubject);
- List<AccessControl> acl = Arrays.asList(writeAccess, adminAccess);
- metadata = new SettableBlobMeta(acl);
- metadata.set_replication_factor(4);
- try (AtomicOutputStream out = store.createBlob("test", metadata, who)) {
- out.write(1);
- }
- assertStoreHasExactly(store, "test");
- who = getSubject(writeSubject);
- assertEquals("Blobstore replication not matching", store.getBlobReplication("test", who), 4);
-
- //Test for a user having WRITE or ADMIN privileges to change replication of a blob
- who = getSubject(adminSubject);
- store.updateBlobReplication("test", 5, who);
- assertEquals("Blobstore replication not matching", store.getBlobReplication("test", who), 5);
- store.deleteBlob("test", getSubject(createSubject));
- }
-
- public Subject getSubject(String name) {
- Subject subject = new Subject();
- SingleUserPrincipal user = new SingleUserPrincipal(name);
- subject.getPrincipals().add(user);
- return subject;
- }
-
- // Check for Blobstore with authentication
- public void testWithAuthentication(BlobStore store)
- throws Exception {
- //Test for Nimbus Admin
- Subject admin = getSubject("admin");
- assertStoreHasExactly(store);
- SettableBlobMeta metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT);
- try (AtomicOutputStream out = store.createBlob("test", metadata, admin)) {
- assertStoreHasExactly(store, "test");
- out.write(1);
- }
- store.deleteBlob("test", admin);
-
- //Test for Nimbus Groups Admin
- Subject adminsGroupsUser = getSubject("adminsGroupsUser");
- assertStoreHasExactly(store);
- metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT);
- try (AtomicOutputStream out = store.createBlob("test", metadata, adminsGroupsUser)) {
- assertStoreHasExactly(store, "test");
- out.write(1);
- }
- store.deleteBlob("test", adminsGroupsUser);
-
- //Test for Supervisor Admin
- Subject supervisor = getSubject("supervisor");
- assertStoreHasExactly(store);
- metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT);
- try (AtomicOutputStream out = store.createBlob("test", metadata, supervisor)) {
- assertStoreHasExactly(store, "test");
- out.write(1);
- }
- store.deleteBlob("test", supervisor);
-
- //Test for Nimbus itself as a user
- Subject nimbus = getNimbusSubject();
- assertStoreHasExactly(store);
- metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT);
- try (AtomicOutputStream out = store.createBlob("test", metadata, nimbus)) {
- assertStoreHasExactly(store, "test");
- out.write(1);
- }
- store.deleteBlob("test", nimbus);
-
- // Test with a dummy test_subject for cases where subject !=null (security turned on)
- Subject who = getSubject("test_subject");
- assertStoreHasExactly(store);
-
- // Tests for case when subject != null (security turned on) and
- // acls for the blob are set to WORLD_EVERYTHING
- metadata = new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING);
- try (AtomicOutputStream out = store.createBlob("test", metadata, who)) {
- out.write(1);
- }
- assertStoreHasExactly(store, "test");
- // Testing whether acls are set to WORLD_EVERYTHING
- assertTrue("ACL does not contain WORLD_EVERYTHING", metadata.toString().contains("AccessControl(type:OTHER, access:7)"));
- readAssertEqualsWithAuth(store, who, "test", 1);
-
- LOG.info("Deleting test");
- store.deleteBlob("test", who);
- assertStoreHasExactly(store);
-
- // Tests for case when subject != null (security turned on) and
- // acls are not set for the blob (DEFAULT)
- LOG.info("Creating test again");
- metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT);
- try (AtomicOutputStream out = store.createBlob("test", metadata, who)) {
- out.write(2);
- }
- assertStoreHasExactly(store, "test");
- // Testing whether acls are set to WORLD_EVERYTHING. Here the acl should not contain WORLD_EVERYTHING because
- // the subject is neither null nor empty. The ACL should however contain USER_EVERYTHING as user needs to have
- // complete access to the blob
- assertTrue("ACL does not contain WORLD_EVERYTHING", !metadata.toString().contains("AccessControl(type:OTHER, access:7)"));
- readAssertEqualsWithAuth(store, who, "test", 2);
-
- LOG.info("Updating test");
- try (AtomicOutputStream out = store.updateBlob("test", who)) {
- out.write(3);
- }
- assertStoreHasExactly(store, "test");
- readAssertEqualsWithAuth(store, who, "test", 3);
-
- LOG.info("Updating test again");
- try (AtomicOutputStream out = store.updateBlob("test", who)) {
- out.write(4);
- }
- LOG.info("SLEEPING");
- Thread.sleep(2);
- assertStoreHasExactly(store, "test");
- readAssertEqualsWithAuth(store, who, "test", 3);
-
- //Test for subject with no principals and acls set to WORLD_EVERYTHING
- who = new Subject();
- metadata = new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING);
- LOG.info("Creating test");
- try (AtomicOutputStream out = store.createBlob("test-empty-subject-WE", metadata, who)) {
- out.write(2);
- }
- assertStoreHasExactly(store, "test-empty-subject-WE", "test");
- // Testing whether acls are set to WORLD_EVERYTHING
- assertTrue("ACL does not contain WORLD_EVERYTHING", metadata.toString().contains("AccessControl(type:OTHER, access:7)"));
- readAssertEqualsWithAuth(store, who, "test-empty-subject-WE", 2);
-
- //Test for subject with no principals and acls set to DEFAULT
- who = new Subject();
- metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT);
- LOG.info("Creating other");
- try (AtomicOutputStream out = store.createBlob("test-empty-subject-DEF", metadata, who)) {
- out.write(2);
- }
- assertStoreHasExactly(store, "test-empty-subject-DEF", "test", "test-empty-subject-WE");
- // Testing whether acls are set to WORLD_EVERYTHING
- assertTrue("ACL does not contain WORLD_EVERYTHING", metadata.toString().contains("AccessControl(type:OTHER, access:7)"));
- readAssertEqualsWithAuth(store, who, "test-empty-subject-DEF", 2);
-
- if (store instanceof HdfsBlobStore) {
- ((HdfsBlobStore) store).fullCleanup(1);
- } else {
- fail("Error the blobstore is of unknowntype");
- }
- }
-
- public void testBasic(BlobStore store)
- throws Exception {
- assertStoreHasExactly(store);
- LOG.info("Creating test");
- // Tests for case when subject == null (security turned off) and
- // acls for the blob are set to WORLD_EVERYTHING
- SettableBlobMeta metadata = new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING);
- try (AtomicOutputStream out = store.createBlob("test", metadata, null)) {
- out.write(1);
- }
- assertStoreHasExactly(store, "test");
- // Testing whether acls are set to WORLD_EVERYTHING
- assertTrue("ACL does not contain WORLD_EVERYTHING", metadata.toString().contains("AccessControl(type:OTHER, access:7)"));
- readAssertEquals(store, "test", 1);
-
- LOG.info("Deleting test");
- store.deleteBlob("test", null);
- assertStoreHasExactly(store);
-
- // The following tests are run for both hdfs and local store to test the
- // update blob interface
- metadata = new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING);
- LOG.info("Creating test again");
- try (AtomicOutputStream out = store.createBlob("test", metadata, null)) {
- out.write(2);
- }
- assertStoreHasExactly(store, "test");
- readAssertEquals(store, "test", 2);
- LOG.info("Updating test");
- try (AtomicOutputStream out = store.updateBlob("test", null)) {
- out.write(3);
- }
- assertStoreHasExactly(store, "test");
- readAssertEquals(store, "test", 3);
-
- LOG.info("Updating test again");
- try (AtomicOutputStream out = store.updateBlob("test", null)) {
- out.write(4);
- }
- LOG.info("SLEEPING");
- Thread.sleep(2);
-
- if (store instanceof HdfsBlobStore) {
- ((HdfsBlobStore) store).fullCleanup(1);
- } else {
- fail("Error the blobstore is of unknowntype");
- }
- }
-
- public void testMultiple(BlobStore store)
- throws Exception {
- assertStoreHasExactly(store);
- LOG.info("Creating test");
- try (AtomicOutputStream out = store.createBlob("test", new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING), null)) {
- out.write(1);
- }
- assertStoreHasExactly(store, "test");
- readAssertEquals(store, "test", 1);
-
- LOG.info("Creating other");
- try (AtomicOutputStream out = store.createBlob("other", new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING),
- null)) {
- out.write(2);
- }
- assertStoreHasExactly(store, "test", "other");
- readAssertEquals(store, "test", 1);
- readAssertEquals(store, "other", 2);
-
- LOG.info("Updating other");
- try (AtomicOutputStream out = store.updateBlob("other", null)) {
- out.write(5);
- }
- assertStoreHasExactly(store, "test", "other");
- readAssertEquals(store, "test", 1);
- readAssertEquals(store, "other", 5);
-
- LOG.info("Deleting test");
- store.deleteBlob("test", null);
- assertStoreHasExactly(store, "other");
- readAssertEquals(store, "other", 5);
-
- LOG.info("Creating test again");
- try (AtomicOutputStream out = store.createBlob("test", new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING),
- null)) {
- out.write(2);
- }
- assertStoreHasExactly(store, "test", "other");
- readAssertEquals(store, "test", 2);
- readAssertEquals(store, "other", 5);
-
- LOG.info("Updating test");
- try (AtomicOutputStream out = store.updateBlob("test", null)) {
- out.write(3);
- }
- assertStoreHasExactly(store, "test", "other");
- readAssertEquals(store, "test", 3);
- readAssertEquals(store, "other", 5);
-
- LOG.info("Deleting other");
- store.deleteBlob("other", null);
- assertStoreHasExactly(store, "test");
- readAssertEquals(store, "test", 3);
-
- LOG.info("Updating test again");
- try (AtomicOutputStream out = store.updateBlob("test", null)) {
- out.write(4);
- }
- LOG.info("SLEEPING");
- Thread.sleep(2);
-
- if (store instanceof HdfsBlobStore) {
- ((HdfsBlobStore) store).fullCleanup(1);
- } else {
- fail("Error the blobstore is of unknowntype");
- }
- assertStoreHasExactly(store, "test");
- readAssertEquals(store, "test", 3);
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/f58d4729/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreImplTest.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreImplTest.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreImplTest.java
deleted file mode 100644
index 3628c79..0000000
--- a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreImplTest.java
+++ /dev/null
@@ -1,226 +0,0 @@
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.hdfs.blobstore;
-
-import org.apache.storm.blobstore.BlobStoreFile;
-import org.apache.storm.generated.SettableBlobMeta;
-import org.apache.commons.io.IOUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.nio.charset.Charset;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-
-import static org.junit.Assert.*;
-
-import java.io.InputStream;
-import java.nio.charset.StandardCharsets;
-import org.apache.storm.hdfs.testing.MiniDFSClusterRule;
-import org.junit.ClassRule;
-
-public class HdfsBlobStoreImplTest {
-
- @ClassRule
- public static final MiniDFSClusterRule DFS_CLUSTER_RULE = new MiniDFSClusterRule();
-
- private static final Logger LOG = LoggerFactory.getLogger(HdfsBlobStoreImplTest.class);
-
- // key dir needs to be number 0 to number of buckets, choose one so we know where to look
- private static String KEYDIR = "0";
- private Path blobDir = new Path("/storm/blobstore1");
- private Path fullKeyDir = new Path(blobDir, KEYDIR);
- private String BLOBSTORE_DATA = "data";
-
- public class TestHdfsBlobStoreImpl extends HdfsBlobStoreImpl implements AutoCloseable {
-
- public TestHdfsBlobStoreImpl(Path path, Map<String, Object> conf) throws IOException {
- super(path, conf);
- }
-
- public TestHdfsBlobStoreImpl(Path path, Map<String, Object> conf,
- Configuration hconf) throws IOException {
- super(path, conf, hconf);
- }
-
- protected Path getKeyDir(String key) {
- return new Path(new Path(blobDir, KEYDIR), key);
- }
-
- @Override
- public void close() throws Exception {
- this.shutdown();
- }
- }
-
- // Be careful about adding additional tests as the dfscluster will be shared
- @Test
- public void testMultiple() throws Exception {
- String testString = "testingblob";
- String validKey = "validkeyBasic";
-
- //Will be closed automatically when shutting down the DFS cluster
- FileSystem fs = DFS_CLUSTER_RULE.getDfscluster().getFileSystem();
- Map<String, Object> conf = new HashMap<>();
-
- try (TestHdfsBlobStoreImpl hbs = new TestHdfsBlobStoreImpl(blobDir, conf, DFS_CLUSTER_RULE.getHadoopConf())) {
- // should have created blobDir
- assertTrue("BlobStore dir wasn't created", fs.exists(blobDir));
- assertEquals("BlobStore dir was created with wrong permissions",
- HdfsBlobStoreImpl.BLOBSTORE_DIR_PERMISSION, fs.getFileStatus(blobDir).getPermission());
-
- // test exist with non-existent key
- assertFalse("file exists but shouldn't", hbs.exists("bogus"));
-
- // test write
- BlobStoreFile pfile = hbs.write(validKey, false);
- // Adding metadata to avoid null pointer exception
- SettableBlobMeta meta = new SettableBlobMeta();
- meta.set_replication_factor(1);
- pfile.setMetadata(meta);
- try (OutputStream ios = pfile.getOutputStream()) {
- ios.write(testString.getBytes(StandardCharsets.UTF_8));
- }
-
- // test commit creates properly
- assertTrue("BlobStore key dir wasn't created", fs.exists(fullKeyDir));
- pfile.commit();
- Path dataFile = new Path(new Path(fullKeyDir, validKey), BLOBSTORE_DATA);
- assertTrue("blob data not committed", fs.exists(dataFile));
- assertEquals("BlobStore dir was created with wrong permissions",
- HdfsBlobStoreFile.BLOBSTORE_FILE_PERMISSION, fs.getFileStatus(dataFile).getPermission());
- assertTrue("key doesn't exist but should", hbs.exists(validKey));
-
- // test read
- BlobStoreFile readpFile = hbs.read(validKey);
- try (InputStream inStream = readpFile.getInputStream()) {
- String readString = IOUtils.toString(inStream, StandardCharsets.UTF_8);
- assertEquals("string read from blob doesn't match", testString, readString);
- }
-
- // test listkeys
- Iterator<String> keys = hbs.listKeys();
- assertTrue("blob has one key", keys.hasNext());
- assertEquals("one key in blobstore", validKey, keys.next());
-
- // delete
- hbs.deleteKey(validKey);
- assertFalse("key not deleted", fs.exists(dataFile));
- assertFalse("key not deleted", hbs.exists(validKey));
-
- // Now do multiple
- String testString2 = "testingblob2";
- String validKey2 = "validkey2";
-
- // test write
- pfile = hbs.write(validKey, false);
- pfile.setMetadata(meta);
- try (OutputStream ios = pfile.getOutputStream()) {
- ios.write(testString.getBytes(StandardCharsets.UTF_8));
- }
-
- // test commit creates properly
- assertTrue("BlobStore key dir wasn't created", fs.exists(fullKeyDir));
- pfile.commit();
- assertTrue("blob data not committed", fs.exists(dataFile));
- assertEquals("BlobStore dir was created with wrong permissions",
- HdfsBlobStoreFile.BLOBSTORE_FILE_PERMISSION, fs.getFileStatus(dataFile).getPermission());
- assertTrue("key doesn't exist but should", hbs.exists(validKey));
-
- // test write again
- pfile = hbs.write(validKey2, false);
- pfile.setMetadata(meta);
- try (OutputStream ios2 = pfile.getOutputStream()) {
- ios2.write(testString2.getBytes(StandardCharsets.UTF_8));
- }
-
- // test commit second creates properly
- pfile.commit();
- Path dataFile2 = new Path(new Path(fullKeyDir, validKey2), BLOBSTORE_DATA);
- assertTrue("blob data not committed", fs.exists(dataFile2));
- assertEquals("BlobStore dir was created with wrong permissions",
- HdfsBlobStoreFile.BLOBSTORE_FILE_PERMISSION, fs.getFileStatus(dataFile2).getPermission());
- assertTrue("key doesn't exist but should", hbs.exists(validKey2));
-
- // test listkeys
- keys = hbs.listKeys();
- int total = 0;
- boolean key1Found = false;
- boolean key2Found = false;
- while (keys.hasNext()) {
- total++;
- String key = keys.next();
- if (key.equals(validKey)) {
- key1Found = true;
- } else if (key.equals(validKey2)) {
- key2Found = true;
- } else {
- fail("Found key that wasn't expected: " + key);
- }
- }
- assertEquals("number of keys is wrong", 2, total);
- assertTrue("blobstore missing key1", key1Found);
- assertTrue("blobstore missing key2", key2Found);
-
- // test read
- readpFile = hbs.read(validKey);
- try (InputStream inStream = readpFile.getInputStream()) {
- String readString = IOUtils.toString(inStream, StandardCharsets.UTF_8);
- assertEquals("string read from blob doesn't match", testString, readString);
- }
-
- // test read
- readpFile = hbs.read(validKey2);
- try (InputStream inStream = readpFile.getInputStream()) {
- String readString = IOUtils.toString(inStream, StandardCharsets.UTF_8);
- assertEquals("string read from blob doesn't match", testString2, readString);
- }
-
- hbs.deleteKey(validKey);
- assertFalse("key not deleted", hbs.exists(validKey));
- hbs.deleteKey(validKey2);
- assertFalse("key not deleted", hbs.exists(validKey2));
- }
- }
-
- @Test
- public void testGetFileLength() throws Exception {
- Map<String, Object> conf = new HashMap<>();
- String validKey = "validkeyBasic";
- String testString = "testingblob";
- try (TestHdfsBlobStoreImpl hbs = new TestHdfsBlobStoreImpl(blobDir, conf, DFS_CLUSTER_RULE.getHadoopConf())) {
- BlobStoreFile pfile = hbs.write(validKey, false);
- // Adding metadata to avoid null pointer exception
- SettableBlobMeta meta = new SettableBlobMeta();
- meta.set_replication_factor(1);
- pfile.setMetadata(meta);
- try (OutputStream ios = pfile.getOutputStream()) {
- ios.write(testString.getBytes(StandardCharsets.UTF_8));
- }
- assertEquals(testString.getBytes(StandardCharsets.UTF_8).length, pfile.getFileLength());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/f58d4729/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index ccace42..dee254e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -351,6 +351,7 @@
<module>external/storm-autocreds</module>
<module>external/storm-kafka</module>
<module>external/storm-hdfs</module>
+ <module>external/storm-hdfs-blobstore</module>
<module>external/storm-hbase</module>
<module>external/storm-hive</module>
<module>external/storm-jdbc</module>
[2/3] storm git commit: [STORM-2916] separate hdfs-blobstore from
storm-hdfs
Posted by ka...@apache.org.
[STORM-2916] separate hdfs-blobstore from storm-hdfs
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/f58d4729
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/f58d4729
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/f58d4729
Branch: refs/heads/master
Commit: f58d472994297d240af8a478d660c3be8a69bd97
Parents: f37a6bd
Author: Ethan Li <et...@gmail.com>
Authored: Mon Jan 29 16:15:01 2018 -0600
Committer: Ethan Li <et...@gmail.com>
Committed: Tue Jan 30 15:05:38 2018 -0600
----------------------------------------------------------------------
external/storm-blobstore-migration/pom.xml | 2 +-
external/storm-hdfs-blobstore/pom.xml | 251 +++++++++
.../storm/hdfs/blobstore/HdfsBlobStore.java | 394 ++++++++++++++
.../storm/hdfs/blobstore/HdfsBlobStoreFile.java | 196 +++++++
.../storm/hdfs/blobstore/HdfsBlobStoreImpl.java | 312 +++++++++++
.../hdfs/blobstore/HdfsClientBlobStore.java | 130 +++++
.../storm/hdfs/blobstore/BlobStoreTest.java | 540 ++++++++++++++++++
.../hdfs/blobstore/HdfsBlobStoreImplTest.java | 224 ++++++++
.../storm/hdfs/blobstore/HdfsBlobStore.java | 395 --------------
.../storm/hdfs/blobstore/HdfsBlobStoreFile.java | 196 -------
.../storm/hdfs/blobstore/HdfsBlobStoreImpl.java | 312 -----------
.../hdfs/blobstore/HdfsClientBlobStore.java | 130 -----
.../storm/hdfs/blobstore/BlobStoreTest.java | 544 -------------------
.../hdfs/blobstore/HdfsBlobStoreImplTest.java | 226 --------
pom.xml | 1 +
15 files changed, 2049 insertions(+), 1804 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/f58d4729/external/storm-blobstore-migration/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-blobstore-migration/pom.xml b/external/storm-blobstore-migration/pom.xml
index c530eec..bd1af90 100644
--- a/external/storm-blobstore-migration/pom.xml
+++ b/external/storm-blobstore-migration/pom.xml
@@ -47,7 +47,7 @@ limitations under the License.
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
- <artifactId>storm-hdfs</artifactId>
+ <artifactId>storm-hdfs-blobstore</artifactId>
<version>${project.version}</version>
<exclusions>
<!--log4j-over-slf4j must be excluded for hadoop-minicluster
http://git-wip-us.apache.org/repos/asf/storm/blob/f58d4729/external/storm-hdfs-blobstore/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-hdfs-blobstore/pom.xml b/external/storm-hdfs-blobstore/pom.xml
new file mode 100644
index 0000000..6383e11
--- /dev/null
+++ b/external/storm-hdfs-blobstore/pom.xml
@@ -0,0 +1,251 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <artifactId>storm</artifactId>
+ <groupId>org.apache.storm</groupId>
+ <version>2.0.0-SNAPSHOT</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+
+ <artifactId>storm-hdfs-blobstore</artifactId>
+
+ <developers>
+ <developer>
+ <id>ptgoetz</id>
+ <name>P. Taylor Goetz</name>
+ <email>ptgoetz@gmail.com</email>
+ </developer>
+ </developers>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-client</artifactId>
+ <version>${project.version}</version>
+ <scope>${provided.scope}</scope>
+ <exclusions>
+ <!--log4j-over-slf4j must be excluded for hadoop-minicluster
+ see: http://stackoverflow.com/q/20469026/3542091 -->
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>log4j-over-slf4j</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs</artifactId>
+ <version>${hadoop.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>jetty-util</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>jetty</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>javax.servlet</groupId>
+ <artifactId>servlet-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-core</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-server</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>javax.servlet.jsp</groupId>
+ <artifactId>jsp-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>commons-codec</groupId>
+ <artifactId>commons-codec</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-minikdc</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.directory.server</groupId>
+ <artifactId>apacheds-kerberos-codec</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpclient</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <version>${hadoop.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-core</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>javax.servlet</groupId>
+ <artifactId>servlet-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>jetty</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>jetty-util</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-json</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-core</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-server</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>javax.servlet.jsp</groupId>
+ <artifactId>jsp-api</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-minicluster</artifactId>
+ <version>${hadoop.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ </exclusions>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-hdfs</artifactId>
+ <version>${project.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ </exclusions>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ </dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <reuseForks>false</reuseForks>
+ <forkCount>1</forkCount>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <version>2.2</version>
+ <executions>
+ <execution>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <artifactId>maven-clean-plugin</artifactId>
+ <version>2.5</version>
+ <executions>
+ <execution>
+ <id>cleanup</id>
+ <phase>clean</phase>
+ <goals>
+ <goal>clean</goal>
+ </goals>
+ <configuration>
+ <excludeDefaultDirectories>true</excludeDefaultDirectories>
+ <filesets>
+ <fileset>
+ <directory>./build/</directory>
+ </fileset>
+ </filesets>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-checkstyle-plugin</artifactId>
+ <!--Note - the version would be inherited-->
+ <configuration>
+ <maxAllowedViolations>80</maxAllowedViolations>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+</project>
http://git-wip-us.apache.org/repos/asf/storm/blob/f58d4729/external/storm-hdfs-blobstore/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStore.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs-blobstore/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStore.java b/external/storm-hdfs-blobstore/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStore.java
new file mode 100644
index 0000000..a4b108f
--- /dev/null
+++ b/external/storm-hdfs-blobstore/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStore.java
@@ -0,0 +1,394 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.hdfs.blobstore;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.storm.Config;
+import org.apache.storm.blobstore.AtomicOutputStream;
+import org.apache.storm.blobstore.BlobStore;
+import org.apache.storm.blobstore.BlobStoreAclHandler;
+import org.apache.storm.blobstore.BlobStoreFile;
+import org.apache.storm.blobstore.InputStreamWithMeta;
+import org.apache.storm.generated.AuthorizationException;
+import org.apache.storm.generated.KeyAlreadyExistsException;
+import org.apache.storm.generated.KeyNotFoundException;
+import org.apache.storm.generated.ReadableBlobMeta;
+import org.apache.storm.generated.SettableBlobMeta;
+import org.apache.storm.nimbus.NimbusInfo;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.security.auth.Subject;
+import java.io.ByteArrayOutputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import static org.apache.storm.blobstore.BlobStoreAclHandler.*;
+
+/**
+ * Provides a HDFS file system backed blob store implementation.
+ * Note that this provides an api for having HDFS be the backing store for the blobstore,
+ * it is not a service/daemon.
+ *
+ * We currently have NIMBUS_ADMINS and SUPERVISOR_ADMINS configuration. NIMBUS_ADMINS are given READ, WRITE and ADMIN
+ * access whereas the SUPERVISOR_ADMINS are given READ access in order to read and download the blobs form the nimbus.
+ *
+ * The ACLs for the blob store are validated against whether the subject is a NIMBUS_ADMIN, SUPERVISOR_ADMIN or USER
+ * who has read, write or admin privileges in order to perform respective operations on the blob.
+ *
+ * For hdfs blob store
+ * 1. The USER interacts with nimbus to upload and access blobs through NimbusBlobStore Client API. Here, unlike
+ * local blob store which stores the blobs locally, the nimbus talks to HDFS to upload the blobs.
+ * 2. The USER sets the ACLs, and the blob access is validated against these ACLs.
+ * 3. The SUPERVISOR interacts with nimbus through HdfsClientBlobStore to download the blobs. Here, unlike local
+ * blob store the supervisor interacts with HDFS directly to download the blobs. The call to HdfsBlobStore is made as a "null"
+ * subject. The blobstore gets the hadoop user and validates permissions for the supervisor.
+ */
+public class HdfsBlobStore extends BlobStore {
+ public static final Logger LOG = LoggerFactory.getLogger(HdfsBlobStore.class);
+ private static final String DATA_PREFIX = "data_";
+ private static final String META_PREFIX = "meta_";
+ private static final HashMap<String, Subject> alreadyLoggedInUsers = new HashMap<>();
+
+ private BlobStoreAclHandler aclHandler;
+ private HdfsBlobStoreImpl hbs;
+ private Subject localSubject;
+ private Map<String, Object> conf;
+
+ /**
+ * Get the subject from Hadoop so we can use it to validate the acls. There is no direct
+ * interface from UserGroupInformation to get the subject, so do a doAs and get the context.
+ * We could probably run everything in the doAs but for now just grab the subject.
+ */
+ private Subject getHadoopUser() {
+ Subject subj;
+ try {
+ subj = UserGroupInformation.getCurrentUser().doAs(
+ new PrivilegedAction<Subject>() {
+ @Override
+ public Subject run() {
+ return Subject.getSubject(AccessController.getContext());
+ }
+ });
+ } catch (IOException e) {
+ throw new RuntimeException("Error creating subject and logging user in!", e);
+ }
+ return subj;
+ }
+
+ /**
+ * If who is null then we want to use the user hadoop says we are.
+ * Required for the supervisor to call these routines as its not
+ * logged in as anyone.
+ */
+ private Subject checkAndGetSubject(Subject who) {
+ if (who == null) {
+ return localSubject;
+ }
+ return who;
+ }
+
+ @Override
+ public void prepare(Map<String, Object> conf, String overrideBase, NimbusInfo nimbusInfo) {
+ this.conf = conf;
+ prepareInternal(conf, overrideBase, null);
+ }
+
+ /**
+ * Allow a Hadoop Configuration to be passed for testing. If it's null then the hadoop configs
+ * must be in your classpath.
+ */
+ protected void prepareInternal(Map<String, Object> conf, String overrideBase, Configuration hadoopConf) {
+ this.conf = conf;
+ if (overrideBase == null) {
+ overrideBase = (String)conf.get(Config.BLOBSTORE_DIR);
+ }
+ if (overrideBase == null) {
+ throw new RuntimeException("You must specify a blobstore directory for HDFS to use!");
+ }
+ LOG.debug("directory is: {}", overrideBase);
+ try {
+ // if a HDFS keytab/principal have been supplied login, otherwise assume they are
+ // logged in already or running insecure HDFS.
+ String principal = (String) conf.get(Config.BLOBSTORE_HDFS_PRINCIPAL);
+ String keyTab = (String) conf.get(Config.BLOBSTORE_HDFS_KEYTAB);
+
+ if (principal != null && keyTab != null) {
+ String combinedKey = principal + " from " + keyTab;
+ synchronized (alreadyLoggedInUsers) {
+ localSubject = alreadyLoggedInUsers.get(combinedKey);
+ if (localSubject == null) {
+ UserGroupInformation.loginUserFromKeytab(principal, keyTab);
+ localSubject = getHadoopUser();
+ alreadyLoggedInUsers.put(combinedKey, localSubject);
+ }
+ }
+ } else {
+ if (principal == null && keyTab != null) {
+ throw new RuntimeException("You must specify an HDFS principal to go with the keytab!");
+
+ } else {
+ if (principal != null && keyTab == null) {
+ throw new RuntimeException("You must specify HDFS keytab go with the principal!");
+ }
+ }
+ localSubject = getHadoopUser();
+ }
+ } catch (IOException e) {
+ throw new RuntimeException("Error logging in from keytab!", e);
+ }
+ aclHandler = new BlobStoreAclHandler(conf);
+ Path baseDir = new Path(overrideBase, BASE_BLOBS_DIR_NAME);
+ try {
+ if (hadoopConf != null) {
+ hbs = new HdfsBlobStoreImpl(baseDir, conf, hadoopConf);
+ } else {
+ hbs = new HdfsBlobStoreImpl(baseDir, conf);
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public AtomicOutputStream createBlob(String key, SettableBlobMeta meta, Subject who)
+ throws AuthorizationException, KeyAlreadyExistsException {
+ if (meta.get_replication_factor() <= 0) {
+ meta.set_replication_factor((int)conf.get(Config.STORM_BLOBSTORE_REPLICATION_FACTOR));
+ }
+ who = checkAndGetSubject(who);
+ validateKey(key);
+ aclHandler.normalizeSettableBlobMeta(key, meta, who, READ | WRITE | ADMIN);
+ BlobStoreAclHandler.validateSettableACLs(key, meta.get_acl());
+ aclHandler.hasPermissions(meta.get_acl(), READ | WRITE | ADMIN, who, key);
+ if (hbs.exists(DATA_PREFIX + key)) {
+ throw new KeyAlreadyExistsException(key);
+ }
+ BlobStoreFileOutputStream mOut = null;
+ try {
+ BlobStoreFile metaFile = hbs.write(META_PREFIX + key, true);
+ metaFile.setMetadata(meta);
+ mOut = new BlobStoreFileOutputStream(metaFile);
+ mOut.write(Utils.thriftSerialize(meta));
+ mOut.close();
+ mOut = null;
+ BlobStoreFile dataFile = hbs.write(DATA_PREFIX + key, true);
+ dataFile.setMetadata(meta);
+ return new BlobStoreFileOutputStream(dataFile);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ } finally {
+ if (mOut != null) {
+ try {
+ mOut.cancel();
+ } catch (IOException e) {
+ //Ignored
+ }
+ }
+ }
+ }
+
+ @Override
+ public AtomicOutputStream updateBlob(String key, Subject who)
+ throws AuthorizationException, KeyNotFoundException {
+ who = checkAndGetSubject(who);
+ SettableBlobMeta meta = getStoredBlobMeta(key);
+ validateKey(key);
+ aclHandler.hasPermissions(meta.get_acl(), WRITE, who, key);
+ try {
+ BlobStoreFile dataFile = hbs.write(DATA_PREFIX + key, false);
+ dataFile.setMetadata(meta);
+ return new BlobStoreFileOutputStream(dataFile);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private SettableBlobMeta getStoredBlobMeta(String key) throws KeyNotFoundException {
+ InputStream in = null;
+ try {
+ BlobStoreFile pf = hbs.read(META_PREFIX + key);
+ try {
+ in = pf.getInputStream();
+ } catch (FileNotFoundException fnf) {
+ throw new KeyNotFoundException(key);
+ }
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ byte[] buffer = new byte[2048];
+ int len;
+ while ((len = in.read(buffer)) > 0) {
+ out.write(buffer, 0, len);
+ }
+ in.close();
+ in = null;
+ return Utils.thriftDeserialize(SettableBlobMeta.class, out.toByteArray());
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ } finally {
+ if (in != null) {
+ try {
+ in.close();
+ } catch (IOException e) {
+ //Ignored
+ }
+ }
+ }
+ }
+
+ @Override
+ public ReadableBlobMeta getBlobMeta(String key, Subject who)
+ throws AuthorizationException, KeyNotFoundException {
+ who = checkAndGetSubject(who);
+ validateKey(key);
+ SettableBlobMeta meta = getStoredBlobMeta(key);
+ aclHandler.validateUserCanReadMeta(meta.get_acl(), who, key);
+ ReadableBlobMeta rbm = new ReadableBlobMeta();
+ rbm.set_settable(meta);
+ try {
+ BlobStoreFile pf = hbs.read(DATA_PREFIX + key);
+ rbm.set_version(pf.getModTime());
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ return rbm;
+ }
+
+ @Override
+ public void setBlobMeta(String key, SettableBlobMeta meta, Subject who)
+ throws AuthorizationException, KeyNotFoundException {
+ if (meta.get_replication_factor() <= 0) {
+ meta.set_replication_factor((int)conf.get(Config.STORM_BLOBSTORE_REPLICATION_FACTOR));
+ }
+ who = checkAndGetSubject(who);
+ validateKey(key);
+ aclHandler.normalizeSettableBlobMeta(key, meta, who, ADMIN);
+ BlobStoreAclHandler.validateSettableACLs(key, meta.get_acl());
+ SettableBlobMeta orig = getStoredBlobMeta(key);
+ aclHandler.hasPermissions(orig.get_acl(), ADMIN, who, key);
+ BlobStoreFileOutputStream mOut = null;
+ writeMetadata(key, meta);
+ }
+
+ @Override
+ public void deleteBlob(String key, Subject who)
+ throws AuthorizationException, KeyNotFoundException {
+ who = checkAndGetSubject(who);
+ validateKey(key);
+ SettableBlobMeta meta = getStoredBlobMeta(key);
+ aclHandler.hasPermissions(meta.get_acl(), WRITE, who, key);
+ try {
+ hbs.deleteKey(DATA_PREFIX + key);
+ hbs.deleteKey(META_PREFIX + key);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public InputStreamWithMeta getBlob(String key, Subject who)
+ throws AuthorizationException, KeyNotFoundException {
+ who = checkAndGetSubject(who);
+ validateKey(key);
+ SettableBlobMeta meta = getStoredBlobMeta(key);
+ aclHandler.hasPermissions(meta.get_acl(), READ, who, key);
+ try {
+ return new BlobStoreFileInputStream(hbs.read(DATA_PREFIX + key));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public Iterator<String> listKeys() {
+ try {
+ return new KeyTranslationIterator(hbs.listKeys(), DATA_PREFIX);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void shutdown() {
+ //Empty
+ }
+
+ @Override
+ public int getBlobReplication(String key, Subject who) throws AuthorizationException, KeyNotFoundException {
+ who = checkAndGetSubject(who);
+ validateKey(key);
+ SettableBlobMeta meta = getStoredBlobMeta(key);
+ aclHandler.hasAnyPermissions(meta.get_acl(), READ | WRITE | ADMIN, who, key);
+ try {
+ return hbs.getBlobReplication(DATA_PREFIX + key);
+ } catch (IOException exp) {
+ throw new RuntimeException(exp);
+ }
+ }
+
+ @Override
+ public int updateBlobReplication(String key, int replication, Subject who) throws AuthorizationException, KeyNotFoundException {
+ who = checkAndGetSubject(who);
+ validateKey(key);
+ SettableBlobMeta meta = getStoredBlobMeta(key);
+ meta.set_replication_factor(replication);
+ aclHandler.hasAnyPermissions(meta.get_acl(), WRITE | ADMIN, who, key);
+ try {
+ writeMetadata(key, meta);
+ return hbs.updateBlobReplication(DATA_PREFIX + key, replication);
+ } catch (IOException exp) {
+ throw new RuntimeException(exp);
+ }
+ }
+
+ public void writeMetadata(String key, SettableBlobMeta meta)
+ throws AuthorizationException, KeyNotFoundException {
+ BlobStoreFileOutputStream mOut = null;
+ try {
+ BlobStoreFile hdfsFile = hbs.write(META_PREFIX + key, false);
+ hdfsFile.setMetadata(meta);
+ mOut = new BlobStoreFileOutputStream(hdfsFile);
+ mOut.write(Utils.thriftSerialize(meta));
+ mOut.close();
+ mOut = null;
+ } catch (IOException exp) {
+ throw new RuntimeException(exp);
+ } finally {
+ if (mOut != null) {
+ try {
+ mOut.cancel();
+ } catch (IOException e) {
+ //Ignored
+ }
+ }
+ }
+ }
+
+ public void fullCleanup(long age) throws IOException {
+ hbs.fullCleanup(age);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/f58d4729/external/storm-hdfs-blobstore/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreFile.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs-blobstore/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreFile.java b/external/storm-hdfs-blobstore/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreFile.java
new file mode 100644
index 0000000..3021e66
--- /dev/null
+++ b/external/storm-hdfs-blobstore/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreFile.java
@@ -0,0 +1,196 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hdfs.blobstore;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Options;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.storm.blobstore.BlobStoreFile;
+import org.apache.storm.generated.SettableBlobMeta;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.regex.Matcher;
+
+public class HdfsBlobStoreFile extends BlobStoreFile {
+ public static final Logger LOG = LoggerFactory.getLogger(HdfsBlobStoreFile.class);
+
+ private final String _key;
+ private final boolean _isTmp;
+ private final Path _path;
+ private Long _modTime = null;
+ private final boolean _mustBeNew;
+ private final Configuration _hadoopConf;
+ private final FileSystem _fs;
+ private SettableBlobMeta meta;
+
+ // files are world-wide readable and owner writable
+ final public static FsPermission BLOBSTORE_FILE_PERMISSION =
+ FsPermission.createImmutable((short) 0644); // rw-r--r--
+
+ public HdfsBlobStoreFile(Path base, String name, Configuration hconf) {
+ if (BLOBSTORE_DATA_FILE.equals(name)) {
+ _isTmp = false;
+ } else {
+ Matcher m = TMP_NAME_PATTERN.matcher(name);
+ if (!m.matches()) {
+ throw new IllegalArgumentException("File name does not match '"+name+"' !~ "+TMP_NAME_PATTERN);
+ }
+ _isTmp = true;
+ }
+ _hadoopConf = hconf;
+ _key = base.getName();
+ _path = new Path(base, name);
+ _mustBeNew = false;
+ try {
+ _fs = _path.getFileSystem(_hadoopConf);
+ } catch (IOException e) {
+ throw new RuntimeException("Error getting filesystem for path: " + _path, e);
+ }
+ }
+
+ public HdfsBlobStoreFile(Path base, boolean isTmp, boolean mustBeNew, Configuration hconf) {
+ _key = base.getName();
+ _hadoopConf = hconf;
+ _isTmp = isTmp;
+ _mustBeNew = mustBeNew;
+ if (_isTmp) {
+ _path = new Path(base, System.currentTimeMillis()+TMP_EXT);
+ } else {
+ _path = new Path(base, BLOBSTORE_DATA_FILE);
+ }
+ try {
+ _fs = _path.getFileSystem(_hadoopConf);
+ } catch (IOException e) {
+ throw new RuntimeException("Error getting filesystem for path: " + _path, e);
+ }
+ }
+
+ @Override
+ public void delete() throws IOException {
+ _fs.delete(_path, true);
+ }
+
+ @Override
+ public boolean isTmp() {
+ return _isTmp;
+ }
+
+ @Override
+ public String getKey() {
+ return _key;
+ }
+
+ @Override
+ public long getModTime() throws IOException {
+ if (_modTime == null) {
+ FileSystem fs = _path.getFileSystem(_hadoopConf);
+ _modTime = fs.getFileStatus(_path).getModificationTime();
+ }
+ return _modTime;
+ }
+
+ private void checkIsNotTmp() {
+ if (!isTmp()) {
+ throw new IllegalStateException("Can only operate on a temporary blobstore file.");
+ }
+ }
+
+ private void checkIsTmp() {
+ if (isTmp()) {
+ throw new IllegalStateException("Cannot operate on a temporary blobstore file.");
+ }
+ }
+
+ @Override
+ public InputStream getInputStream() throws IOException {
+ checkIsTmp();
+ return _fs.open(_path);
+ }
+
+ @Override
+ public OutputStream getOutputStream() throws IOException {
+ checkIsNotTmp();
+ OutputStream out = null;
+ FsPermission fileperms = new FsPermission(BLOBSTORE_FILE_PERMISSION);
+ try {
+ out = _fs.create(_path, (short)this.getMetadata().get_replication_factor());
+ _fs.setPermission(_path, fileperms);
+ _fs.setReplication(_path, (short)this.getMetadata().get_replication_factor());
+ } catch (IOException e) {
+ //Try to create the parent directory, may not work
+ FsPermission dirperms = new FsPermission(HdfsBlobStoreImpl.BLOBSTORE_DIR_PERMISSION);
+ if (!_fs.mkdirs(_path.getParent(), dirperms)) {
+ LOG.warn("error creating parent dir: " + _path.getParent());
+ }
+ out = _fs.create(_path, (short)this.getMetadata().get_replication_factor());
+ _fs.setPermission(_path, dirperms);
+ _fs.setReplication(_path, (short)this.getMetadata().get_replication_factor());
+ }
+ if (out == null) {
+ throw new IOException("Error in creating: " + _path);
+ }
+ return out;
+ }
+
+ @Override
+ public void commit() throws IOException {
+ checkIsNotTmp();
+ // FileContext supports atomic rename, whereas FileSystem doesn't
+ FileContext fc = FileContext.getFileContext(_hadoopConf);
+ Path dest = new Path(_path.getParent(), BLOBSTORE_DATA_FILE);
+ if (_mustBeNew) {
+ fc.rename(_path, dest);
+ } else {
+ fc.rename(_path, dest, Options.Rename.OVERWRITE);
+ }
+ // Note, we could add support for setting the replication factor
+ }
+
+ @Override
+ public void cancel() throws IOException {
+ checkIsNotTmp();
+ delete();
+ }
+
+ @Override
+ public String toString() {
+ return _path+":"+(_isTmp ? "tmp": BlobStoreFile.BLOBSTORE_DATA_FILE)+":"+_key;
+ }
+
+ @Override
+ public long getFileLength() throws IOException {
+ return _fs.getFileStatus(_path).getLen();
+ }
+
+ @Override
+ public SettableBlobMeta getMetadata() {
+ return meta;
+ }
+
+ @Override
+ public void setMetadata(SettableBlobMeta meta) {
+ this.meta = meta;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/f58d4729/external/storm-hdfs-blobstore/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreImpl.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs-blobstore/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreImpl.java b/external/storm-hdfs-blobstore/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreImpl.java
new file mode 100644
index 0000000..702a16f
--- /dev/null
+++ b/external/storm-hdfs-blobstore/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreImpl.java
@@ -0,0 +1,312 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hdfs.blobstore;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.storm.Config;
+import org.apache.storm.blobstore.BlobStoreFile;
+import org.apache.storm.utils.ObjectReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Timer;
+import java.util.TimerTask;
+
+/**
+ * HDFS blob store impl.
+ */
+public class HdfsBlobStoreImpl {
+ private static final Logger LOG = LoggerFactory.getLogger(HdfsBlobStoreImpl.class);
+
+ private static final long FULL_CLEANUP_FREQ = 60 * 60 * 1000l;
+ private static final int BUCKETS = 1024;
+ private static final String BLOBSTORE_DATA = "data";
+
+ private Timer timer;
+
+ public class KeyInHashDirIterator implements Iterator<String> {
+ private int currentBucket = 0;
+ private Iterator<String> it = null;
+ private String next = null;
+
+ public KeyInHashDirIterator() throws IOException {
+ primeNext();
+ }
+
+ private void primeNext() throws IOException {
+ while (it == null && currentBucket < BUCKETS) {
+ String name = String.valueOf(currentBucket);
+ Path dir = new Path(_fullPath, name);
+ try {
+ it = listKeys(dir);
+ } catch (FileNotFoundException e) {
+ it = null;
+ }
+ if (it == null || !it.hasNext()) {
+ it = null;
+ currentBucket++;
+ } else {
+ next = it.next();
+ }
+ }
+ }
+
+ @Override
+ public boolean hasNext() {
+ return next != null;
+ }
+
+ @Override
+ public String next() {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+ String current = next;
+ next = null;
+ if (it != null) {
+ if (!it.hasNext()) {
+ it = null;
+ currentBucket++;
+ try {
+ primeNext();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ } else {
+ next = it.next();
+ }
+ }
+ return current;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException("Delete Not Supported");
+ }
+ }
+
+
+ private Path _fullPath;
+ private FileSystem _fs;
+ private Configuration _hadoopConf;
+
+ // blobstore directory is private!
+ final public static FsPermission BLOBSTORE_DIR_PERMISSION =
+ FsPermission.createImmutable((short) 0700); // rwx--------
+
+ public HdfsBlobStoreImpl(Path path, Map<String, Object> conf) throws IOException {
+ this(path, conf, new Configuration());
+ }
+
+ public HdfsBlobStoreImpl(Path path, Map<String, Object> conf,
+ Configuration hconf) throws IOException {
+ LOG.info("Blob store based in {}", path);
+ _fullPath = path;
+ _hadoopConf = hconf;
+ _fs = path.getFileSystem(_hadoopConf);
+
+ if (!_fs.exists(_fullPath)) {
+ FsPermission perms = new FsPermission(BLOBSTORE_DIR_PERMISSION);
+ boolean success = _fs.mkdirs(_fullPath, perms);
+ if (!success) {
+ throw new IOException("Error creating blobstore directory: " + _fullPath);
+ }
+ }
+
+ Object shouldCleanup = conf.get(Config.BLOBSTORE_CLEANUP_ENABLE);
+ if (ObjectReader.getBoolean(shouldCleanup, false)) {
+ LOG.debug("Starting hdfs blobstore cleaner");
+ TimerTask cleanup = new TimerTask() {
+ @Override
+ public void run() {
+ try {
+ fullCleanup(FULL_CLEANUP_FREQ);
+ } catch (IOException e) {
+ LOG.error("Error trying to cleanup", e);
+ }
+ }
+ };
+ timer = new Timer("HdfsBlobStore cleanup thread", true);
+ timer.scheduleAtFixedRate(cleanup, 0, FULL_CLEANUP_FREQ);
+ }
+ }
+
+ /**
+ * @return all keys that are available for reading.
+ * @throws IOException on any error.
+ */
+ public Iterator<String> listKeys() throws IOException {
+ return new KeyInHashDirIterator();
+ }
+
+ /**
+ * Get an input stream for reading a part.
+ *
+ * @param key the key of the part to read.
+ * @return the where to read the data from.
+ * @throws IOException on any error
+ */
+ public BlobStoreFile read(String key) throws IOException {
+ return new HdfsBlobStoreFile(getKeyDir(key), BLOBSTORE_DATA, _hadoopConf);
+ }
+
+ /**
+ * Get an object tied to writing the data.
+ *
+ * @param key the key of the part to write to.
+ * @param create whether the file needs to be new or not.
+ * @return an object that can be used to both write to, but also commit/cancel the operation.
+ * @throws IOException on any error
+ */
+ public BlobStoreFile write(String key, boolean create) throws IOException {
+ return new HdfsBlobStoreFile(getKeyDir(key), true, create, _hadoopConf);
+ }
+
+ /**
+ * Check if the key exists in the blob store.
+ *
+ * @param key the key to check for
+ * @return true if it exists else false.
+ */
+ public boolean exists(String key) {
+ Path dir = getKeyDir(key);
+ boolean res = false;
+ try {
+ _fs = dir.getFileSystem(_hadoopConf);
+ res = _fs.exists(dir);
+ } catch (IOException e) {
+ LOG.warn("Exception checking for exists on: " + key);
+ }
+ return res;
+ }
+
+ /**
+ * Delete a key from the blob store
+ *
+ * @param key the key to delete
+ * @throws IOException on any error
+ */
+ public void deleteKey(String key) throws IOException {
+ Path keyDir = getKeyDir(key);
+ HdfsBlobStoreFile pf = new HdfsBlobStoreFile(keyDir, BLOBSTORE_DATA,
+ _hadoopConf);
+ pf.delete();
+ delete(keyDir);
+ }
+
+ protected Path getKeyDir(String key) {
+ String hash = String.valueOf(Math.abs((long) key.hashCode()) % BUCKETS);
+ Path hashDir = new Path(_fullPath, hash);
+
+ Path ret = new Path(hashDir, key);
+ LOG.debug("{} Looking for {} in {}", new Object[]{_fullPath, key, hash});
+ return ret;
+ }
+
+ public void fullCleanup(long age) throws IOException {
+ long cleanUpIfBefore = System.currentTimeMillis() - age;
+ Iterator<String> keys = new KeyInHashDirIterator();
+ while (keys.hasNext()) {
+ String key = keys.next();
+ Path keyDir = getKeyDir(key);
+ Iterator<BlobStoreFile> i = listBlobStoreFiles(keyDir);
+ if (!i.hasNext()) {
+ //The dir is empty, so try to delete it, may fail, but that is OK
+ try {
+ _fs.delete(keyDir, true);
+ } catch (Exception e) {
+ LOG.warn("Could not delete " + keyDir + " will try again later");
+ }
+ }
+ while (i.hasNext()) {
+ BlobStoreFile f = i.next();
+ if (f.isTmp()) {
+ if (f.getModTime() <= cleanUpIfBefore) {
+ f.delete();
+ }
+ }
+ }
+ }
+ }
+
+ protected Iterator<BlobStoreFile> listBlobStoreFiles(Path path) throws IOException {
+ ArrayList<BlobStoreFile> ret = new ArrayList<BlobStoreFile>();
+ FileStatus[] files = _fs.listStatus(new Path[]{path});
+ if (files != null) {
+ for (FileStatus sub : files) {
+ try {
+ ret.add(new HdfsBlobStoreFile(sub.getPath().getParent(), sub.getPath().getName(),
+ _hadoopConf));
+ } catch (IllegalArgumentException e) {
+ //Ignored the file did not match
+ LOG.warn("Found an unexpected file in {} {}", path, sub.getPath().getName());
+ }
+ }
+ }
+ return ret.iterator();
+ }
+
+ protected Iterator<String> listKeys(Path path) throws IOException {
+ ArrayList<String> ret = new ArrayList<String>();
+ FileStatus[] files = _fs.listStatus(new Path[]{path});
+ if (files != null) {
+ for (FileStatus sub : files) {
+ try {
+ ret.add(sub.getPath().getName().toString());
+ } catch (IllegalArgumentException e) {
+ //Ignored the file did not match
+ LOG.debug("Found an unexpected file in {} {}", path, sub.getPath().getName());
+ }
+ }
+ }
+ return ret.iterator();
+ }
+
+ protected int getBlobReplication(String key) throws IOException {
+ Path path = getKeyDir(key);
+ Path dest = new Path(path, BLOBSTORE_DATA);
+ return _fs.getFileStatus(dest).getReplication();
+ }
+
+ protected int updateBlobReplication(String key, int replication) throws IOException {
+ Path path = getKeyDir(key);
+ Path dest = new Path(path, BLOBSTORE_DATA);
+ _fs.setReplication(dest, (short) replication);
+ return _fs.getFileStatus(dest).getReplication();
+ }
+
+ protected void delete(Path path) throws IOException {
+ _fs.delete(path, true);
+ }
+
+ public void shutdown() {
+ if (timer != null) {
+ timer.cancel();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/f58d4729/external/storm-hdfs-blobstore/src/main/java/org/apache/storm/hdfs/blobstore/HdfsClientBlobStore.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs-blobstore/src/main/java/org/apache/storm/hdfs/blobstore/HdfsClientBlobStore.java b/external/storm-hdfs-blobstore/src/main/java/org/apache/storm/hdfs/blobstore/HdfsClientBlobStore.java
new file mode 100644
index 0000000..2cb4dc3
--- /dev/null
+++ b/external/storm-hdfs-blobstore/src/main/java/org/apache/storm/hdfs/blobstore/HdfsClientBlobStore.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hdfs.blobstore;
+
+import org.apache.storm.blobstore.AtomicOutputStream;
+import org.apache.storm.blobstore.ClientBlobStore;
+import org.apache.storm.blobstore.InputStreamWithMeta;
+import org.apache.storm.generated.AuthorizationException;
+import org.apache.storm.generated.KeyAlreadyExistsException;
+import org.apache.storm.generated.KeyNotFoundException;
+import org.apache.storm.generated.ReadableBlobMeta;
+import org.apache.storm.generated.SettableBlobMeta;
+import org.apache.storm.utils.NimbusClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * Client to access the HDFS blobStore. At this point, this is meant to only be used by the
+ * supervisor. Don't trust who the client says they are so pass null for all Subjects.
+ *
+ * The HdfsBlobStore implementation takes care of the null Subjects. It assigns Subjects
+ * based on what hadoop says who the users are. These users must be configured accordingly
+ * in the SUPERVISOR_ADMINS for ACL validation and for the supervisors to download the blobs.
+ * This API is only used by the supervisor in order to talk directly to HDFS.
+ */
+public class HdfsClientBlobStore extends ClientBlobStore {
+ private static final Logger LOG = LoggerFactory.getLogger(HdfsClientBlobStore.class);
+ private HdfsBlobStore _blobStore;
+ private Map _conf;
+ private NimbusClient client;
+
+ @Override
+ public void prepare(Map<String, Object> conf) {
+ this._conf = conf;
+ _blobStore = new HdfsBlobStore();
+ _blobStore.prepare(conf, null, null);
+ }
+
+ @Override
+ public AtomicOutputStream createBlobToExtend(String key, SettableBlobMeta meta)
+ throws AuthorizationException, KeyAlreadyExistsException {
+ return _blobStore.createBlob(key, meta, null);
+ }
+
+ @Override
+ public AtomicOutputStream updateBlob(String key)
+ throws AuthorizationException, KeyNotFoundException {
+ return _blobStore.updateBlob(key, null);
+ }
+
+ @Override
+ public ReadableBlobMeta getBlobMeta(String key)
+ throws AuthorizationException, KeyNotFoundException {
+ return _blobStore.getBlobMeta(key, null);
+ }
+
+ @Override
+ public void setBlobMetaToExtend(String key, SettableBlobMeta meta)
+ throws AuthorizationException, KeyNotFoundException {
+ _blobStore.setBlobMeta(key, meta, null);
+ }
+
+ @Override
+ public void deleteBlob(String key) throws AuthorizationException, KeyNotFoundException {
+ _blobStore.deleteBlob(key, null);
+ }
+
+ @Override
+ public InputStreamWithMeta getBlob(String key)
+ throws AuthorizationException, KeyNotFoundException {
+ return _blobStore.getBlob(key, null);
+ }
+
+ @Override
+ public Iterator<String> listKeys() {
+ return _blobStore.listKeys();
+ }
+
+ @Override
+ public int getBlobReplication(String key) throws AuthorizationException, KeyNotFoundException {
+ return _blobStore.getBlobReplication(key, null);
+ }
+
+ @Override
+ public int updateBlobReplication(String key, int replication) throws AuthorizationException, KeyNotFoundException {
+ return _blobStore.updateBlobReplication(key, replication, null);
+ }
+
+ @Override
+ public boolean setClient(Map<String, Object> conf, NimbusClient client) {
+ this.client = client;
+ return true;
+ }
+
+ @Override
+ public void createStateInZookeeper(String key) {
+ // Do nothing
+ }
+
+ @Override
+ public void shutdown() {
+ close();
+ }
+
+ @Override
+ public void close() {
+ if(client != null) {
+ client.close();
+ client = null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/f58d4729/external/storm-hdfs-blobstore/src/test/java/org/apache/storm/hdfs/blobstore/BlobStoreTest.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs-blobstore/src/test/java/org/apache/storm/hdfs/blobstore/BlobStoreTest.java b/external/storm-hdfs-blobstore/src/test/java/org/apache/storm/hdfs/blobstore/BlobStoreTest.java
new file mode 100644
index 0000000..a125793
--- /dev/null
+++ b/external/storm-hdfs-blobstore/src/test/java/org/apache/storm/hdfs/blobstore/BlobStoreTest.java
@@ -0,0 +1,540 @@
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hdfs.blobstore;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.storm.Config;
+import org.apache.storm.blobstore.AtomicOutputStream;
+import org.apache.storm.blobstore.BlobStore;
+import org.apache.storm.blobstore.BlobStoreAclHandler;
+import org.apache.storm.generated.AccessControl;
+import org.apache.storm.generated.AccessControlType;
+import org.apache.storm.generated.AuthorizationException;
+import org.apache.storm.generated.KeyNotFoundException;
+import org.apache.storm.generated.SettableBlobMeta;
+import org.apache.storm.hdfs.testing.MiniDFSClusterRule;
+import org.apache.storm.security.auth.FixedGroupsMapping;
+import org.apache.storm.security.auth.NimbusPrincipal;
+import org.apache.storm.security.auth.SingleUserPrincipal;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.security.auth.Subject;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+import static org.junit.Assert.*;
+
+public class BlobStoreTest {
+
+ @ClassRule
+ public static final MiniDFSClusterRule DFS_CLUSTER_RULE = new MiniDFSClusterRule();
+
+ private static final Logger LOG = LoggerFactory.getLogger(BlobStoreTest.class);
+ URI base;
+ File baseFile;
+ private static final Map<String, Object> CONF = new HashMap<>();
+ public static final int READ = 0x01;
+ public static final int WRITE = 0x02;
+ public static final int ADMIN = 0x04;
+
+ @Before
+ public void init() {
+ initializeConfigs();
+ baseFile = new File("/tmp/blob-store-test-" + UUID.randomUUID());
+ base = baseFile.toURI();
+ }
+
+ @After
+ public void cleanup()
+ throws IOException {
+ FileUtils.deleteDirectory(baseFile);
+ }
+
+ // Method which initializes nimbus admin
+ public static void initializeConfigs() {
+ CONF.put(Config.NIMBUS_ADMINS, "admin");
+ CONF.put(Config.NIMBUS_ADMINS_GROUPS, "adminsGroup");
+
+ // Construct a groups mapping for the FixedGroupsMapping class
+ Map<String, Set<String>> groupsMapping = new HashMap<>();
+ Set<String> groupSet = new HashSet<>();
+ groupSet.add("adminsGroup");
+ groupsMapping.put("adminsGroupsUser", groupSet);
+
+ // Now create a params map to put it in to our conf
+ Map<String, Object> paramMap = new HashMap<>();
+ paramMap.put(FixedGroupsMapping.STORM_FIXED_GROUP_MAPPING, groupsMapping);
+ CONF.put(Config.STORM_GROUP_MAPPING_SERVICE_PROVIDER_PLUGIN, "org.apache.storm.security.auth.FixedGroupsMapping");
+ CONF.put(Config.STORM_GROUP_MAPPING_SERVICE_PARAMS, paramMap);
+ CONF.put(Config.NIMBUS_SUPERVISOR_USERS, "supervisor");
+ }
+
+ //Gets Nimbus Subject with NimbusPrincipal set on it
+ public static Subject getNimbusSubject() {
+ Subject nimbus = new Subject();
+ nimbus.getPrincipals().add(new NimbusPrincipal());
+ return nimbus;
+ }
+
+ // Overloading the assertStoreHasExactly method accomodate Subject in order to check for authorization
+ public static void assertStoreHasExactly(BlobStore store, Subject who, String... keys)
+ throws IOException, KeyNotFoundException, AuthorizationException {
+ Set<String> expected = new HashSet<>(Arrays.asList(keys));
+ Set<String> found = new HashSet<>();
+ Iterator<String> c = store.listKeys();
+ while (c.hasNext()) {
+ String keyName = c.next();
+ found.add(keyName);
+ }
+ Set<String> extra = new HashSet<>(found);
+ extra.removeAll(expected);
+ assertTrue("Found extra keys in the blob store " + extra, extra.isEmpty());
+ Set<String> missing = new HashSet<>(expected);
+ missing.removeAll(found);
+ assertTrue("Found keys missing from the blob store " + missing, missing.isEmpty());
+ }
+
+ public static void assertStoreHasExactly(BlobStore store, String... keys)
+ throws IOException, KeyNotFoundException, AuthorizationException {
+ assertStoreHasExactly(store, null, keys);
+ }
+
+ // Overloading the readInt method accomodate Subject in order to check for authorization (security turned on)
+ public static int readInt(BlobStore store, Subject who, String key) throws IOException, KeyNotFoundException, AuthorizationException {
+ try (InputStream in = store.getBlob(key, who)) {
+ return in.read();
+ }
+ }
+
+ public static int readInt(BlobStore store, String key)
+ throws IOException, KeyNotFoundException, AuthorizationException {
+ return readInt(store, null, key);
+ }
+
+ public static void readAssertEquals(BlobStore store, String key, int value)
+ throws IOException, KeyNotFoundException, AuthorizationException {
+ assertEquals(value, readInt(store, key));
+ }
+
+ // Checks for assertion when we turn on security
+ public void readAssertEqualsWithAuth(BlobStore store, Subject who, String key, int value)
+ throws IOException, KeyNotFoundException, AuthorizationException {
+ assertEquals(value, readInt(store, who, key));
+ }
+
+ private AutoCloseableBlobStoreContainer initHdfs(String dirName)
+ throws Exception {
+ Map<String, Object> conf = new HashMap<>();
+ conf.put(Config.BLOBSTORE_DIR, dirName);
+ conf.put(Config.STORM_PRINCIPAL_TO_LOCAL_PLUGIN, "org.apache.storm.security.auth.DefaultPrincipalToLocal");
+ conf.put(Config.STORM_BLOBSTORE_REPLICATION_FACTOR, 3);
+ HdfsBlobStore store = new HdfsBlobStore();
+ store.prepareInternal(conf, null, DFS_CLUSTER_RULE.getDfscluster().getConfiguration(0));
+ return new AutoCloseableBlobStoreContainer(store);
+ }
+
+ private static class AutoCloseableBlobStoreContainer implements AutoCloseable {
+
+ private final HdfsBlobStore blobStore;
+
+ public AutoCloseableBlobStoreContainer(HdfsBlobStore blobStore) {
+ this.blobStore = blobStore;
+ }
+
+ @Override
+ public void close() throws Exception {
+ this.blobStore.shutdown();
+ }
+
+ }
+
+ @Test
+ public void testHdfsReplication()
+ throws Exception {
+ try (AutoCloseableBlobStoreContainer container = initHdfs("/storm/blobstoreReplication")) {
+ testReplication("/storm/blobstoreReplication/test", container.blobStore);
+ }
+ }
+
+ @Test
+ public void testBasicHdfs()
+ throws Exception {
+ try (AutoCloseableBlobStoreContainer container = initHdfs("/storm/blobstore1")) {
+ testBasic(container.blobStore);
+ }
+ }
+
+ @Test
+ public void testMultipleHdfs()
+ throws Exception {
+ // use different blobstore dir so it doesn't conflict with other test
+ try (AutoCloseableBlobStoreContainer container = initHdfs("/storm/blobstore2")) {
+ testMultiple(container.blobStore);
+ }
+ }
+
+ @Test
+ public void testHdfsWithAuth()
+ throws Exception {
+ // use different blobstore dir so it doesn't conflict with other tests
+ try (AutoCloseableBlobStoreContainer container = initHdfs("/storm/blobstore3")) {
+ testWithAuthentication(container.blobStore);
+ }
+ }
+
+ // Test for replication.
+ public void testReplication(String path, BlobStore store)
+ throws Exception {
+ SettableBlobMeta metadata = new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING);
+ metadata.set_replication_factor(4);
+ try (AtomicOutputStream out = store.createBlob("test", metadata, null)) {
+ out.write(1);
+ }
+ assertStoreHasExactly(store, "test");
+ assertEquals("Blobstore replication not matching", store.getBlobReplication("test", null), 4);
+ store.deleteBlob("test", null);
+
+ //Test for replication with NIMBUS as user
+ Subject admin = getSubject("admin");
+ metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT);
+ metadata.set_replication_factor(4);
+ try (AtomicOutputStream out = store.createBlob("test", metadata, admin)) {
+ out.write(1);
+ }
+ assertStoreHasExactly(store, "test");
+ assertEquals("Blobstore replication not matching", store.getBlobReplication("test", admin), 4);
+ store.updateBlobReplication("test", 5, admin);
+ assertEquals("Blobstore replication not matching", store.getBlobReplication("test", admin), 5);
+ store.deleteBlob("test", admin);
+
+ //Test for replication using SUPERVISOR access
+ Subject supervisor = getSubject("supervisor");
+ metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT);
+ metadata.set_replication_factor(4);
+ try (AtomicOutputStream out = store.createBlob("test", metadata, supervisor)) {
+ out.write(1);
+ }
+ assertStoreHasExactly(store, "test");
+ assertEquals("Blobstore replication not matching", store.getBlobReplication("test", supervisor), 4);
+ store.updateBlobReplication("test", 5, supervisor);
+ assertEquals("Blobstore replication not matching", store.getBlobReplication("test", supervisor), 5);
+ store.deleteBlob("test", supervisor);
+
+ Subject adminsGroupsUser = getSubject("adminsGroupsUser");
+ metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT);
+ metadata.set_replication_factor(4);
+ try (AtomicOutputStream out = store.createBlob("test", metadata, adminsGroupsUser)) {
+ out.write(1);
+ }
+ assertStoreHasExactly(store, "test");
+ assertEquals("Blobstore replication not matching", store.getBlobReplication("test", adminsGroupsUser), 4);
+ store.updateBlobReplication("test", 5, adminsGroupsUser);
+ assertEquals("Blobstore replication not matching", store.getBlobReplication("test", adminsGroupsUser), 5);
+ store.deleteBlob("test", adminsGroupsUser);
+
+ //Test for a user having read or write or admin access to read replication for a blob
+ String createSubject = "createSubject";
+ String writeSubject = "writeSubject";
+ String adminSubject = "adminSubject";
+ Subject who = getSubject(createSubject);
+ AccessControl writeAccess = new AccessControl(AccessControlType.USER, READ);
+ AccessControl adminAccess = new AccessControl(AccessControlType.USER, ADMIN);
+ writeAccess.set_name(writeSubject);
+ adminAccess.set_name(adminSubject);
+ List<AccessControl> acl = Arrays.asList(writeAccess, adminAccess);
+ metadata = new SettableBlobMeta(acl);
+ metadata.set_replication_factor(4);
+ try (AtomicOutputStream out = store.createBlob("test", metadata, who)) {
+ out.write(1);
+ }
+ assertStoreHasExactly(store, "test");
+ who = getSubject(writeSubject);
+ assertEquals("Blobstore replication not matching", store.getBlobReplication("test", who), 4);
+
+ //Test for a user having WRITE or ADMIN privileges to change replication of a blob
+ who = getSubject(adminSubject);
+ store.updateBlobReplication("test", 5, who);
+ assertEquals("Blobstore replication not matching", store.getBlobReplication("test", who), 5);
+ store.deleteBlob("test", getSubject(createSubject));
+ }
+
+ public Subject getSubject(String name) {
+ Subject subject = new Subject();
+ SingleUserPrincipal user = new SingleUserPrincipal(name);
+ subject.getPrincipals().add(user);
+ return subject;
+ }
+
+ // Check for Blobstore with authentication
+ public void testWithAuthentication(BlobStore store)
+ throws Exception {
+ //Test for Nimbus Admin
+ Subject admin = getSubject("admin");
+ assertStoreHasExactly(store);
+ SettableBlobMeta metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT);
+ try (AtomicOutputStream out = store.createBlob("test", metadata, admin)) {
+ assertStoreHasExactly(store, "test");
+ out.write(1);
+ }
+ store.deleteBlob("test", admin);
+
+ //Test for Nimbus Groups Admin
+ Subject adminsGroupsUser = getSubject("adminsGroupsUser");
+ assertStoreHasExactly(store);
+ metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT);
+ try (AtomicOutputStream out = store.createBlob("test", metadata, adminsGroupsUser)) {
+ assertStoreHasExactly(store, "test");
+ out.write(1);
+ }
+ store.deleteBlob("test", adminsGroupsUser);
+
+ //Test for Supervisor Admin
+ Subject supervisor = getSubject("supervisor");
+ assertStoreHasExactly(store);
+ metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT);
+ try (AtomicOutputStream out = store.createBlob("test", metadata, supervisor)) {
+ assertStoreHasExactly(store, "test");
+ out.write(1);
+ }
+ store.deleteBlob("test", supervisor);
+
+ //Test for Nimbus itself as a user
+ Subject nimbus = getNimbusSubject();
+ assertStoreHasExactly(store);
+ metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT);
+ try (AtomicOutputStream out = store.createBlob("test", metadata, nimbus)) {
+ assertStoreHasExactly(store, "test");
+ out.write(1);
+ }
+ store.deleteBlob("test", nimbus);
+
+ // Test with a dummy test_subject for cases where subject !=null (security turned on)
+ Subject who = getSubject("test_subject");
+ assertStoreHasExactly(store);
+
+ // Tests for case when subject != null (security turned on) and
+ // acls for the blob are set to WORLD_EVERYTHING
+ metadata = new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING);
+ try (AtomicOutputStream out = store.createBlob("test", metadata, who)) {
+ out.write(1);
+ }
+ assertStoreHasExactly(store, "test");
+ // Testing whether acls are set to WORLD_EVERYTHING
+ assertTrue("ACL does not contain WORLD_EVERYTHING", metadata.toString().contains("AccessControl(type:OTHER, access:7)"));
+ readAssertEqualsWithAuth(store, who, "test", 1);
+
+ LOG.info("Deleting test");
+ store.deleteBlob("test", who);
+ assertStoreHasExactly(store);
+
+ // Tests for case when subject != null (security turned on) and
+ // acls are not set for the blob (DEFAULT)
+ LOG.info("Creating test again");
+ metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT);
+ try (AtomicOutputStream out = store.createBlob("test", metadata, who)) {
+ out.write(2);
+ }
+ assertStoreHasExactly(store, "test");
+ // Testing whether acls are set to WORLD_EVERYTHING. Here the acl should not contain WORLD_EVERYTHING because
+ // the subject is neither null nor empty. The ACL should however contain USER_EVERYTHING as user needs to have
+ // complete access to the blob
+ assertTrue("ACL does not contain WORLD_EVERYTHING", !metadata.toString().contains("AccessControl(type:OTHER, access:7)"));
+ readAssertEqualsWithAuth(store, who, "test", 2);
+
+ LOG.info("Updating test");
+ try (AtomicOutputStream out = store.updateBlob("test", who)) {
+ out.write(3);
+ }
+ assertStoreHasExactly(store, "test");
+ readAssertEqualsWithAuth(store, who, "test", 3);
+
+ LOG.info("Updating test again");
+ try (AtomicOutputStream out = store.updateBlob("test", who)) {
+ out.write(4);
+ }
+ LOG.info("SLEEPING");
+ Thread.sleep(2);
+ assertStoreHasExactly(store, "test");
+ readAssertEqualsWithAuth(store, who, "test", 3);
+
+ //Test for subject with no principals and acls set to WORLD_EVERYTHING
+ who = new Subject();
+ metadata = new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING);
+ LOG.info("Creating test");
+ try (AtomicOutputStream out = store.createBlob("test-empty-subject-WE", metadata, who)) {
+ out.write(2);
+ }
+ assertStoreHasExactly(store, "test-empty-subject-WE", "test");
+ // Testing whether acls are set to WORLD_EVERYTHING
+ assertTrue("ACL does not contain WORLD_EVERYTHING", metadata.toString().contains("AccessControl(type:OTHER, access:7)"));
+ readAssertEqualsWithAuth(store, who, "test-empty-subject-WE", 2);
+
+ //Test for subject with no principals and acls set to DEFAULT
+ who = new Subject();
+ metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT);
+ LOG.info("Creating other");
+ try (AtomicOutputStream out = store.createBlob("test-empty-subject-DEF", metadata, who)) {
+ out.write(2);
+ }
+ assertStoreHasExactly(store, "test-empty-subject-DEF", "test", "test-empty-subject-WE");
+ // Testing whether acls are set to WORLD_EVERYTHING
+ assertTrue("ACL does not contain WORLD_EVERYTHING", metadata.toString().contains("AccessControl(type:OTHER, access:7)"));
+ readAssertEqualsWithAuth(store, who, "test-empty-subject-DEF", 2);
+
+ if (store instanceof HdfsBlobStore) {
+ ((HdfsBlobStore) store).fullCleanup(1);
+ } else {
+ fail("Error the blobstore is of unknowntype");
+ }
+ }
+
+ public void testBasic(BlobStore store)
+ throws Exception {
+ assertStoreHasExactly(store);
+ LOG.info("Creating test");
+ // Tests for case when subject == null (security turned off) and
+ // acls for the blob are set to WORLD_EVERYTHING
+ SettableBlobMeta metadata = new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING);
+ try (AtomicOutputStream out = store.createBlob("test", metadata, null)) {
+ out.write(1);
+ }
+ assertStoreHasExactly(store, "test");
+ // Testing whether acls are set to WORLD_EVERYTHING
+ assertTrue("ACL does not contain WORLD_EVERYTHING", metadata.toString().contains("AccessControl(type:OTHER, access:7)"));
+ readAssertEquals(store, "test", 1);
+
+ LOG.info("Deleting test");
+ store.deleteBlob("test", null);
+ assertStoreHasExactly(store);
+
+ // The following tests are run for both hdfs and local store to test the
+ // update blob interface
+ metadata = new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING);
+ LOG.info("Creating test again");
+ try (AtomicOutputStream out = store.createBlob("test", metadata, null)) {
+ out.write(2);
+ }
+ assertStoreHasExactly(store, "test");
+ readAssertEquals(store, "test", 2);
+ LOG.info("Updating test");
+ try (AtomicOutputStream out = store.updateBlob("test", null)) {
+ out.write(3);
+ }
+ assertStoreHasExactly(store, "test");
+ readAssertEquals(store, "test", 3);
+
+ LOG.info("Updating test again");
+ try (AtomicOutputStream out = store.updateBlob("test", null)) {
+ out.write(4);
+ }
+ LOG.info("SLEEPING");
+ Thread.sleep(2);
+
+ if (store instanceof HdfsBlobStore) {
+ ((HdfsBlobStore) store).fullCleanup(1);
+ } else {
+ fail("Error the blobstore is of unknowntype");
+ }
+ }
+
+ public void testMultiple(BlobStore store)
+ throws Exception {
+ assertStoreHasExactly(store);
+ LOG.info("Creating test");
+ try (AtomicOutputStream out = store.createBlob("test", new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING), null)) {
+ out.write(1);
+ }
+ assertStoreHasExactly(store, "test");
+ readAssertEquals(store, "test", 1);
+
+ LOG.info("Creating other");
+ try (AtomicOutputStream out = store.createBlob("other", new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING),
+ null)) {
+ out.write(2);
+ }
+ assertStoreHasExactly(store, "test", "other");
+ readAssertEquals(store, "test", 1);
+ readAssertEquals(store, "other", 2);
+
+ LOG.info("Updating other");
+ try (AtomicOutputStream out = store.updateBlob("other", null)) {
+ out.write(5);
+ }
+ assertStoreHasExactly(store, "test", "other");
+ readAssertEquals(store, "test", 1);
+ readAssertEquals(store, "other", 5);
+
+ LOG.info("Deleting test");
+ store.deleteBlob("test", null);
+ assertStoreHasExactly(store, "other");
+ readAssertEquals(store, "other", 5);
+
+ LOG.info("Creating test again");
+ try (AtomicOutputStream out = store.createBlob("test", new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING),
+ null)) {
+ out.write(2);
+ }
+ assertStoreHasExactly(store, "test", "other");
+ readAssertEquals(store, "test", 2);
+ readAssertEquals(store, "other", 5);
+
+ LOG.info("Updating test");
+ try (AtomicOutputStream out = store.updateBlob("test", null)) {
+ out.write(3);
+ }
+ assertStoreHasExactly(store, "test", "other");
+ readAssertEquals(store, "test", 3);
+ readAssertEquals(store, "other", 5);
+
+ LOG.info("Deleting other");
+ store.deleteBlob("other", null);
+ assertStoreHasExactly(store, "test");
+ readAssertEquals(store, "test", 3);
+
+ LOG.info("Updating test again");
+ try (AtomicOutputStream out = store.updateBlob("test", null)) {
+ out.write(4);
+ }
+ LOG.info("SLEEPING");
+ Thread.sleep(2);
+
+ if (store instanceof HdfsBlobStore) {
+ ((HdfsBlobStore) store).fullCleanup(1);
+ } else {
+ fail("Error the blobstore is of unknowntype");
+ }
+ assertStoreHasExactly(store, "test");
+ readAssertEquals(store, "test", 3);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/f58d4729/external/storm-hdfs-blobstore/src/test/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreImplTest.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs-blobstore/src/test/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreImplTest.java b/external/storm-hdfs-blobstore/src/test/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreImplTest.java
new file mode 100644
index 0000000..752e563
--- /dev/null
+++ b/external/storm-hdfs-blobstore/src/test/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreImplTest.java
@@ -0,0 +1,224 @@
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hdfs.blobstore;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.storm.blobstore.BlobStoreFile;
+import org.apache.storm.generated.SettableBlobMeta;
+import org.apache.storm.hdfs.testing.MiniDFSClusterRule;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import static org.junit.Assert.*;
+
+public class HdfsBlobStoreImplTest {
+
+ @ClassRule
+ public static final MiniDFSClusterRule DFS_CLUSTER_RULE = new MiniDFSClusterRule();
+
+ private static final Logger LOG = LoggerFactory.getLogger(HdfsBlobStoreImplTest.class);
+
+ // key dir needs to be number 0 to number of buckets, choose one so we know where to look
+ private static String KEYDIR = "0";
+ private Path blobDir = new Path("/storm/blobstore1");
+ private Path fullKeyDir = new Path(blobDir, KEYDIR);
+ private String BLOBSTORE_DATA = "data";
+
+ public class TestHdfsBlobStoreImpl extends HdfsBlobStoreImpl implements AutoCloseable {
+
+ public TestHdfsBlobStoreImpl(Path path, Map<String, Object> conf) throws IOException {
+ super(path, conf);
+ }
+
+ public TestHdfsBlobStoreImpl(Path path, Map<String, Object> conf,
+ Configuration hconf) throws IOException {
+ super(path, conf, hconf);
+ }
+
+ protected Path getKeyDir(String key) {
+ return new Path(new Path(blobDir, KEYDIR), key);
+ }
+
+ @Override
+ public void close() throws Exception {
+ this.shutdown();
+ }
+ }
+
+ // Be careful about adding additional tests as the dfscluster will be shared
+ @Test
+ public void testMultiple() throws Exception {
+ String testString = "testingblob";
+ String validKey = "validkeyBasic";
+
+ //Will be closed automatically when shutting down the DFS cluster
+ FileSystem fs = DFS_CLUSTER_RULE.getDfscluster().getFileSystem();
+ Map<String, Object> conf = new HashMap<>();
+
+ try (TestHdfsBlobStoreImpl hbs = new TestHdfsBlobStoreImpl(blobDir, conf, DFS_CLUSTER_RULE.getHadoopConf())) {
+ // should have created blobDir
+ assertTrue("BlobStore dir wasn't created", fs.exists(blobDir));
+ assertEquals("BlobStore dir was created with wrong permissions",
+ HdfsBlobStoreImpl.BLOBSTORE_DIR_PERMISSION, fs.getFileStatus(blobDir).getPermission());
+
+ // test exist with non-existent key
+ assertFalse("file exists but shouldn't", hbs.exists("bogus"));
+
+ // test write
+ BlobStoreFile pfile = hbs.write(validKey, false);
+ // Adding metadata to avoid null pointer exception
+ SettableBlobMeta meta = new SettableBlobMeta();
+ meta.set_replication_factor(1);
+ pfile.setMetadata(meta);
+ try (OutputStream ios = pfile.getOutputStream()) {
+ ios.write(testString.getBytes(StandardCharsets.UTF_8));
+ }
+
+ // test commit creates properly
+ assertTrue("BlobStore key dir wasn't created", fs.exists(fullKeyDir));
+ pfile.commit();
+ Path dataFile = new Path(new Path(fullKeyDir, validKey), BLOBSTORE_DATA);
+ assertTrue("blob data not committed", fs.exists(dataFile));
+ assertEquals("BlobStore dir was created with wrong permissions",
+ HdfsBlobStoreFile.BLOBSTORE_FILE_PERMISSION, fs.getFileStatus(dataFile).getPermission());
+ assertTrue("key doesn't exist but should", hbs.exists(validKey));
+
+ // test read
+ BlobStoreFile readpFile = hbs.read(validKey);
+ try (InputStream inStream = readpFile.getInputStream()) {
+ String readString = IOUtils.toString(inStream, StandardCharsets.UTF_8);
+ assertEquals("string read from blob doesn't match", testString, readString);
+ }
+
+ // test listkeys
+ Iterator<String> keys = hbs.listKeys();
+ assertTrue("blob has one key", keys.hasNext());
+ assertEquals("one key in blobstore", validKey, keys.next());
+
+ // delete
+ hbs.deleteKey(validKey);
+ assertFalse("key not deleted", fs.exists(dataFile));
+ assertFalse("key not deleted", hbs.exists(validKey));
+
+ // Now do multiple
+ String testString2 = "testingblob2";
+ String validKey2 = "validkey2";
+
+ // test write
+ pfile = hbs.write(validKey, false);
+ pfile.setMetadata(meta);
+ try (OutputStream ios = pfile.getOutputStream()) {
+ ios.write(testString.getBytes(StandardCharsets.UTF_8));
+ }
+
+ // test commit creates properly
+ assertTrue("BlobStore key dir wasn't created", fs.exists(fullKeyDir));
+ pfile.commit();
+ assertTrue("blob data not committed", fs.exists(dataFile));
+ assertEquals("BlobStore dir was created with wrong permissions",
+ HdfsBlobStoreFile.BLOBSTORE_FILE_PERMISSION, fs.getFileStatus(dataFile).getPermission());
+ assertTrue("key doesn't exist but should", hbs.exists(validKey));
+
+ // test write again
+ pfile = hbs.write(validKey2, false);
+ pfile.setMetadata(meta);
+ try (OutputStream ios2 = pfile.getOutputStream()) {
+ ios2.write(testString2.getBytes(StandardCharsets.UTF_8));
+ }
+
+ // test commit second creates properly
+ pfile.commit();
+ Path dataFile2 = new Path(new Path(fullKeyDir, validKey2), BLOBSTORE_DATA);
+ assertTrue("blob data not committed", fs.exists(dataFile2));
+ assertEquals("BlobStore dir was created with wrong permissions",
+ HdfsBlobStoreFile.BLOBSTORE_FILE_PERMISSION, fs.getFileStatus(dataFile2).getPermission());
+ assertTrue("key doesn't exist but should", hbs.exists(validKey2));
+
+ // test listkeys
+ keys = hbs.listKeys();
+ int total = 0;
+ boolean key1Found = false;
+ boolean key2Found = false;
+ while (keys.hasNext()) {
+ total++;
+ String key = keys.next();
+ if (key.equals(validKey)) {
+ key1Found = true;
+ } else if (key.equals(validKey2)) {
+ key2Found = true;
+ } else {
+ fail("Found key that wasn't expected: " + key);
+ }
+ }
+ assertEquals("number of keys is wrong", 2, total);
+ assertTrue("blobstore missing key1", key1Found);
+ assertTrue("blobstore missing key2", key2Found);
+
+ // test read
+ readpFile = hbs.read(validKey);
+ try (InputStream inStream = readpFile.getInputStream()) {
+ String readString = IOUtils.toString(inStream, StandardCharsets.UTF_8);
+ assertEquals("string read from blob doesn't match", testString, readString);
+ }
+
+ // test read
+ readpFile = hbs.read(validKey2);
+ try (InputStream inStream = readpFile.getInputStream()) {
+ String readString = IOUtils.toString(inStream, StandardCharsets.UTF_8);
+ assertEquals("string read from blob doesn't match", testString2, readString);
+ }
+
+ hbs.deleteKey(validKey);
+ assertFalse("key not deleted", hbs.exists(validKey));
+ hbs.deleteKey(validKey2);
+ assertFalse("key not deleted", hbs.exists(validKey2));
+ }
+ }
+
+ @Test
+ public void testGetFileLength() throws Exception {
+ Map<String, Object> conf = new HashMap<>();
+ String validKey = "validkeyBasic";
+ String testString = "testingblob";
+ try (TestHdfsBlobStoreImpl hbs = new TestHdfsBlobStoreImpl(blobDir, conf, DFS_CLUSTER_RULE.getHadoopConf())) {
+ BlobStoreFile pfile = hbs.write(validKey, false);
+ // Adding metadata to avoid null pointer exception
+ SettableBlobMeta meta = new SettableBlobMeta();
+ meta.set_replication_factor(1);
+ pfile.setMetadata(meta);
+ try (OutputStream ios = pfile.getOutputStream()) {
+ ios.write(testString.getBytes(StandardCharsets.UTF_8));
+ }
+ assertEquals(testString.getBytes(StandardCharsets.UTF_8).length, pfile.getFileLength());
+ }
+ }
+}
[3/3] storm git commit: Merge branch 'STORM-2916' of
https://github.com/Ethanlm/storm into STORM-2916-merge
Posted by ka...@apache.org.
Merge branch 'STORM-2916' of https://github.com/Ethanlm/storm into STORM-2916-merge
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/d68416b2
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/d68416b2
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/d68416b2
Branch: refs/heads/master
Commit: d68416b2447756e2fdf31ef63cbd3428eb299899
Parents: 18045a3 f58d472
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Fri Feb 2 00:24:05 2018 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Fri Feb 2 00:24:05 2018 +0900
----------------------------------------------------------------------
external/storm-blobstore-migration/pom.xml | 2 +-
external/storm-hdfs-blobstore/pom.xml | 251 +++++++++
.../storm/hdfs/blobstore/HdfsBlobStore.java | 394 ++++++++++++++
.../storm/hdfs/blobstore/HdfsBlobStoreFile.java | 196 +++++++
.../storm/hdfs/blobstore/HdfsBlobStoreImpl.java | 312 +++++++++++
.../hdfs/blobstore/HdfsClientBlobStore.java | 130 +++++
.../storm/hdfs/blobstore/BlobStoreTest.java | 540 ++++++++++++++++++
.../hdfs/blobstore/HdfsBlobStoreImplTest.java | 224 ++++++++
.../storm/hdfs/blobstore/HdfsBlobStore.java | 395 --------------
.../storm/hdfs/blobstore/HdfsBlobStoreFile.java | 196 -------
.../storm/hdfs/blobstore/HdfsBlobStoreImpl.java | 312 -----------
.../hdfs/blobstore/HdfsClientBlobStore.java | 130 -----
.../storm/hdfs/blobstore/BlobStoreTest.java | 544 -------------------
.../hdfs/blobstore/HdfsBlobStoreImplTest.java | 226 --------
pom.xml | 1 +
15 files changed, 2049 insertions(+), 1804 deletions(-)
----------------------------------------------------------------------