You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2017/04/06 17:33:34 UTC

[45/52] [partial] storm git commit: STORM-2441 Break down 'storm-core' to extract client (worker) artifacts

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/blobstore/BlobStoreAclHandler.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/blobstore/BlobStoreAclHandler.java b/storm-client/src/jvm/org/apache/storm/blobstore/BlobStoreAclHandler.java
new file mode 100644
index 0000000..5b3866d
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/blobstore/BlobStoreAclHandler.java
@@ -0,0 +1,409 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.blobstore;
+
+import org.apache.storm.Config;
+import org.apache.storm.generated.AccessControl;
+import org.apache.storm.generated.AccessControlType;
+import org.apache.storm.generated.AuthorizationException;
+import org.apache.storm.generated.SettableBlobMeta;
+import org.apache.storm.security.auth.AuthUtils;
+import org.apache.storm.security.auth.IPrincipalToLocal;
+import org.apache.storm.security.auth.NimbusPrincipal;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.security.auth.Subject;
+import java.security.Principal;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Provides common handling of acls for Blobstores.
+ * Also contains some static utility functions related to Blobstores.
+ */
+public class BlobStoreAclHandler {
+    public static final Logger LOG = LoggerFactory.getLogger(BlobStoreAclHandler.class);
+    private final IPrincipalToLocal _ptol;
+
+    public static final int READ = 0x01;
+    public static final int WRITE = 0x02;
+    public static final int ADMIN = 0x04;
+    public static final List<AccessControl> WORLD_EVERYTHING =
+            Arrays.asList(new AccessControl(AccessControlType.OTHER, READ | WRITE | ADMIN));
+    public static final List<AccessControl> DEFAULT = new ArrayList<AccessControl>();
+    private Set<String> _supervisors;
+    private Set<String> _admins;
+    private boolean doAclValidation;
+
+    public BlobStoreAclHandler(Map conf) {
+        _ptol = AuthUtils.GetPrincipalToLocalPlugin(conf);
+        _supervisors = new HashSet<String>();
+        _admins = new HashSet<String>();
+        if (conf.containsKey(Config.NIMBUS_SUPERVISOR_USERS)) {
+            _supervisors.addAll((List<String>)conf.get(Config.NIMBUS_SUPERVISOR_USERS));
+        }
+        if (conf.containsKey(Config.NIMBUS_ADMINS)) {
+            _admins.addAll((List<String>)conf.get(Config.NIMBUS_ADMINS));
+        }
+        if (conf.containsKey(Config.STORM_BLOBSTORE_ACL_VALIDATION_ENABLED)) {
+           doAclValidation = (boolean)conf.get(Config.STORM_BLOBSTORE_ACL_VALIDATION_ENABLED);
+        }
+    }
+
+    private static AccessControlType parseACLType(String type) {
+        if ("other".equalsIgnoreCase(type) || "o".equalsIgnoreCase(type)) {
+            return AccessControlType.OTHER;
+        } else if ("user".equalsIgnoreCase(type) || "u".equalsIgnoreCase(type)) {
+            return AccessControlType.USER;
+        }
+        throw new IllegalArgumentException(type+" is not a valid access control type");
+    }
+
+    private static int parseAccess(String access) {
+        int ret = 0;
+        for (char c: access.toCharArray()) {
+            if ('r' == c) {
+                ret = ret | READ;
+            } else if ('w' == c) {
+                ret = ret | WRITE;
+            } else if ('a' == c) {
+                ret = ret | ADMIN;
+            } else if ('-' == c) {
+                //ignored
+            } else {
+                throw new IllegalArgumentException("");
+            }
+        }
+        return ret;
+    }
+
+    public static AccessControl parseAccessControl(String str) {
+        String[] parts = str.split(":");
+        String type = "other";
+        String name = "";
+        String access = "-";
+        if (parts.length > 3) {
+            throw new IllegalArgumentException("Don't know how to parse "+str+" into an ACL value");
+        } else if (parts.length == 1) {
+            type = "other";
+            name = "";
+            access = parts[0];
+        } else if (parts.length == 2) {
+            type = "user";
+            name = parts[0];
+            access = parts[1];
+        } else if (parts.length == 3) {
+            type = parts[0];
+            name = parts[1];
+            access = parts[2];
+        }
+        AccessControl ret = new AccessControl();
+        ret.set_type(parseACLType(type));
+        ret.set_name(name);
+        ret.set_access(parseAccess(access));
+        return ret;
+    }
+
+    private static String accessToString(int access) {
+        StringBuilder ret = new StringBuilder();
+        ret.append(((access & READ) > 0) ? "r" : "-");
+        ret.append(((access & WRITE) > 0) ? "w" : "-");
+        ret.append(((access & ADMIN) > 0) ? "a" : "-");
+        return ret.toString();
+    }
+
+    public static String accessControlToString(AccessControl ac) {
+        StringBuilder ret = new StringBuilder();
+        switch(ac.get_type()) {
+            case OTHER:
+                ret.append("o");
+                break;
+            case USER:
+                ret.append("u");
+                break;
+            default:
+                throw new IllegalArgumentException("Don't know what a type of "+ac.get_type()+" means ");
+        }
+        ret.append(":");
+        if (ac.is_set_name()) {
+            ret.append(ac.get_name());
+        }
+        ret.append(":");
+        ret.append(accessToString(ac.get_access()));
+        return ret.toString();
+    }
+
+    public static void validateSettableACLs(String key, List<AccessControl> acls) throws AuthorizationException {
+        Set<String> aclUsers = new HashSet<>();
+        List<String> duplicateUsers = new ArrayList<>();
+        for (AccessControl acl : acls) {
+            String aclUser = acl.get_name();
+            if (!StringUtils.isEmpty(aclUser) && !aclUsers.add(aclUser)) {
+                LOG.error("'{}' user can't appear more than once in the ACLs", aclUser);
+                duplicateUsers.add(aclUser);
+            }
+        }
+        if (duplicateUsers.size() > 0) {
+            String errorMessage  = "user " + Arrays.toString(duplicateUsers.toArray())
+                    + " can't appear more than once in the ACLs for key [" + key +"].";
+            throw new AuthorizationException(errorMessage);
+        }
+    }
+
+    private Set<String> constructUserFromPrincipals(Subject who) {
+        Set<String> user = new HashSet<String>();
+        if (who != null) {
+            for (Principal p : who.getPrincipals()) {
+                user.add(_ptol.toLocal(p));
+            }
+        }
+        return user;
+    }
+
+    private boolean isAdmin(Subject who) {
+        Set<String> user = constructUserFromPrincipals(who);
+        for (String u : user) {
+            if (_admins.contains(u)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    private boolean isReadOperation(int operation) {
+        if (operation == 1) {
+            return true;
+        }
+        return false;
+    }
+
+    private boolean isSupervisor(Subject who, int operation) {
+        Set<String> user = constructUserFromPrincipals(who);
+        if (isReadOperation(operation)) {
+            for (String u : user) {
+                if (_supervisors.contains(u)) {
+                    return true;
+                }
+            }
+        }
+        return false;
+    }
+
+    private boolean isNimbus(Subject who) {
+        Set<Principal> principals;
+        boolean isNimbusInstance = false;
+        if (who != null) {
+            principals = who.getPrincipals();
+            for (Principal principal : principals) {
+                if (principal instanceof NimbusPrincipal) {
+                    isNimbusInstance = true;
+                }
+            }
+        }
+        return isNimbusInstance;
+    }
+
+    public boolean checkForValidUsers(Subject who, int mask) {
+        return isNimbus(who) || isAdmin(who) || isSupervisor(who,mask);
+    }
+
+    /**
+     * The user should be able to see the metadata if and only if they have any of READ, WRITE, or ADMIN
+     */
+    public void validateUserCanReadMeta(List<AccessControl> acl, Subject who, String key) throws AuthorizationException {
+        hasAnyPermissions(acl, (READ|WRITE|ADMIN), who, key);
+    }
+
+    /**
+     * Validates if the user has any of the permissions
+     * mentioned in the mask.
+     * @param acl ACL for the key.
+     * @param mask mask holds the cumulative value of
+     * READ = 1, WRITE = 2 or ADMIN = 4 permissions.
+     * mask = 1 implies READ privilege.
+     * mask = 5 implies READ and ADMIN privileges.
+     * @param who Is the user against whom the permissions
+     * are validated for a key using the ACL and the mask.
+     * @param key Key used to identify the blob.
+     * @throws AuthorizationException
+     */
+    public void hasAnyPermissions(List<AccessControl> acl, int mask, Subject who, String key) throws AuthorizationException {
+        if (!doAclValidation) {
+            return;
+        }
+        Set<String> user = constructUserFromPrincipals(who);
+        LOG.debug("user {}", user);
+        if (checkForValidUsers(who, mask)) {
+            return;
+        }
+        for (AccessControl ac : acl) {
+            int allowed = getAllowed(ac, user);
+            LOG.debug(" user: {} allowed: {} key: {}", user, allowed, key);
+            if ((allowed & mask) > 0) {
+                return;
+            }
+        }
+        throw new AuthorizationException(
+                user + " does not have access to " + key);
+    }
+
+    /**
+     * Validates if the user has at least the set of permissions
+     * mentioned in the mask.
+     * @param acl ACL for the key.
+     * @param mask mask holds the cumulative value of
+     * READ = 1, WRITE = 2 or ADMIN = 4 permissions.
+     * mask = 1 implies READ privilege.
+     * mask = 5 implies READ and ADMIN privileges.
+     * @param who Is the user against whom the permissions
+     * are validated for a key using the ACL and the mask.
+     * @param key Key used to identify the blob.
+     * @throws AuthorizationException
+     */
+    public void hasPermissions(List<AccessControl> acl, int mask, Subject who, String key) throws AuthorizationException {
+        if (!doAclValidation) {
+            return;
+        }
+        Set<String> user = constructUserFromPrincipals(who);
+        LOG.debug("user {}", user);
+        if (checkForValidUsers(who, mask)) {
+            return;
+        }
+        for (AccessControl ac : acl) {
+            int allowed = getAllowed(ac, user);
+            mask = ~allowed & mask;
+            LOG.debug(" user: {} allowed: {} disallowed: {} key: {}", user, allowed, mask, key);
+        }
+        if (mask == 0) {
+            return;
+        }
+        throw new AuthorizationException(
+                user + " does not have " + namedPerms(mask) + " access to " + key);
+    }
+
+    public void normalizeSettableBlobMeta(String key, SettableBlobMeta meta, Subject who, int opMask) {
+        meta.set_acl(normalizeSettableACLs(key, meta.get_acl(), who, opMask));
+    }
+
+    private String namedPerms(int mask) {
+        StringBuilder b = new StringBuilder();
+        b.append("[");
+        if ((mask & READ) > 0) {
+            b.append("READ ");
+        }
+        if ((mask & WRITE) > 0) {
+            b.append("WRITE ");
+        }
+        if ((mask & ADMIN) > 0) {
+            b.append("ADMIN ");
+        }
+        b.append("]");
+        return b.toString();
+    }
+
+    private int getAllowed(AccessControl ac, Set<String> users) {
+        switch (ac.get_type()) {
+            case OTHER:
+                return ac.get_access();
+            case USER:
+                if (users.contains(ac.get_name())) {
+                    return ac.get_access();
+                }
+                return 0;
+            default:
+                return 0;
+        }
+    }
+
+    private List<AccessControl> removeBadACLs(List<AccessControl> accessControls) {
+        List<AccessControl> resultAcl = new ArrayList<AccessControl>();
+        for (AccessControl control : accessControls) {
+            if(control.get_type().equals(AccessControlType.OTHER) && (control.get_access() == 0 )) {
+                LOG.debug("Removing invalid blobstore world ACL " +
+                        BlobStoreAclHandler.accessControlToString(control));
+                continue;
+            }
+            resultAcl.add(control);
+        }
+        return resultAcl;
+    }
+
+    private final List<AccessControl> normalizeSettableACLs(String key, List<AccessControl> acls, Subject who,
+                                                            int opMask) {
+        List<AccessControl> cleanAcls = removeBadACLs(acls);
+        Set<String> userNames = getUserNamesFromSubject(who);
+        for (String user : userNames) {
+            fixACLsForUser(cleanAcls, user, opMask);
+        }
+        if ((who == null || userNames.isEmpty()) && !worldEverything(acls)) {
+            cleanAcls.addAll(BlobStoreAclHandler.WORLD_EVERYTHING);
+            LOG.debug("Access Control for key {} is normalized to world everything {}", key, cleanAcls);
+            if (!acls.isEmpty())
+                LOG.warn("Access control for blob with key {} is normalized to WORLD_EVERYTHING", key);
+        }
+        return cleanAcls;
+    }
+
+    private boolean worldEverything(List<AccessControl> acls) {
+        boolean isWorldEverything = false;
+        for (AccessControl acl : acls) {
+            if (acl.get_type() == AccessControlType.OTHER && acl.get_access() == (READ|WRITE|ADMIN)) {
+                isWorldEverything = true;
+                break;
+            }
+        }
+        return isWorldEverything;
+    }
+
+    private void fixACLsForUser(List<AccessControl> acls, String user, int mask) {
+        boolean foundUserACL = false;
+        for (AccessControl control : acls) {
+            if (control.get_type() == AccessControlType.USER && control.get_name().equals(user)) {
+                int currentAccess = control.get_access();
+                if ((currentAccess & mask) != mask) {
+                    control.set_access(currentAccess | mask);
+                }
+                foundUserACL = true;
+                break;
+            }
+        }
+        if (!foundUserACL) {
+            AccessControl userACL = new AccessControl();
+            userACL.set_type(AccessControlType.USER);
+            userACL.set_name(user);
+            userACL.set_access(mask);
+            acls.add(userACL);
+        }
+    }
+
+    private Set<String> getUserNamesFromSubject(Subject who) {
+        Set<String> user = new HashSet<String>();
+        if (who != null) {
+            for(Principal p: who.getPrincipals()) {
+                user.add(_ptol.toLocal(p));
+            }
+        }
+        return user;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/blobstore/BlobStoreFile.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/blobstore/BlobStoreFile.java b/storm-client/src/jvm/org/apache/storm/blobstore/BlobStoreFile.java
new file mode 100644
index 0000000..9de2f4a
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/blobstore/BlobStoreFile.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.blobstore;
+
+import org.apache.storm.generated.SettableBlobMeta;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.regex.Pattern;
+
+/**
+ * Provides an base implementation for creating a blobstore based on file backed storage.
+ */
+public abstract class BlobStoreFile {
+    public static final Logger LOG = LoggerFactory.getLogger(BlobStoreFile.class);
+
+    protected static final String TMP_EXT = ".tmp";
+    protected static final Pattern TMP_NAME_PATTERN = Pattern.compile("^\\d+\\" + TMP_EXT + "$");
+    protected static final String BLOBSTORE_DATA_FILE = "data";
+
+    public abstract void delete() throws IOException;
+    public abstract String getKey();
+    public abstract boolean isTmp();
+    public abstract void setMetadata(SettableBlobMeta meta);
+    public abstract SettableBlobMeta getMetadata();
+    public abstract long getModTime() throws IOException;
+    public abstract InputStream getInputStream() throws IOException;
+    public abstract OutputStream getOutputStream() throws IOException;
+    public abstract void commit() throws IOException;
+    public abstract void cancel() throws IOException;
+    public abstract long getFileLength() throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/blobstore/ClientBlobStore.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/blobstore/ClientBlobStore.java b/storm-client/src/jvm/org/apache/storm/blobstore/ClientBlobStore.java
new file mode 100644
index 0000000..a1499aa
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/blobstore/ClientBlobStore.java
@@ -0,0 +1,201 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.blobstore;
+
+import org.apache.storm.daemon.Shutdownable;
+import org.apache.storm.generated.AuthorizationException;
+import org.apache.storm.generated.ReadableBlobMeta;
+import org.apache.storm.generated.SettableBlobMeta;
+import org.apache.storm.generated.KeyAlreadyExistsException;
+import org.apache.storm.generated.KeyNotFoundException;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.Utils;
+import org.apache.storm.utils.NimbusClient;
+
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * The ClientBlobStore has two concrete implementations
+ * 1. NimbusBlobStore
+ * 2. HdfsClientBlobStore
+ *
+ * Create, update, read and delete are some of the basic operations defined by this interface.
+ * Each operation is validated for permissions against an user. We currently have NIMBUS_ADMINS and SUPERVISOR_ADMINS
+ * configuration. NIMBUS_ADMINS are given READ, WRITE and ADMIN access whereas the SUPERVISOR_ADMINS are given READ
+ * access in order to read and download the blobs form the nimbus.
+ *
+ * The ACLs for the blob store are validated against whether the subject is a NIMBUS_ADMIN, SUPERVISOR_ADMIN or USER
+ * who has read, write or admin privileges in order to perform respective operations on the blob.
+ *
+ * For more detailed implementation
+ * @see org.apache.storm.blobstore.NimbusBlobStore
+ * @see org.apache.storm.blobstore.LocalFsBlobStore
+ * @see org.apache.storm.hdfs.blobstore.HdfsClientBlobStore
+ * @see org.apache.storm.hdfs.blobstore.HdfsBlobStore
+ */
+public abstract class ClientBlobStore implements Shutdownable {
+    protected Map conf;
+
+    public interface WithBlobstore {
+        void run(ClientBlobStore blobStore) throws Exception;
+    }
+
+    public static void withConfiguredClient(WithBlobstore withBlobstore) throws Exception {
+        Map<String, Object> conf = ConfigUtils.readStormConfig();
+        ClientBlobStore blobStore = Utils.getClientBlobStore(conf);
+
+        try {
+            withBlobstore.run(blobStore);
+        } finally {
+            blobStore.shutdown();
+        }
+    }
+
+    /**
+     * Sets up the client API by parsing the configs.
+     * @param conf The storm conf containing the config details.
+     */
+    public abstract void prepare(Map conf);
+
+    /**
+     * Client facing API to create a blob.
+     * @param key blob key name.
+     * @param meta contains ACL information.
+     * @return AtomicOutputStream returns an output stream into which data can be written.
+     * @throws AuthorizationException
+     * @throws KeyAlreadyExistsException
+     */
+    protected abstract AtomicOutputStream createBlobToExtend(String key, SettableBlobMeta meta) throws AuthorizationException, KeyAlreadyExistsException;
+
+    /**
+     * Client facing API to update a blob.
+     * @param key blob key name.
+     * @return AtomicOutputStream returns an output stream into which data can be written.
+     * @throws AuthorizationException
+     * @throws KeyNotFoundException
+     */
+    public abstract AtomicOutputStream updateBlob(String key) throws AuthorizationException, KeyNotFoundException;
+
+    /**
+     * Client facing API to read the metadata information.
+     * @param key blob key name.
+     * @return AtomicOutputStream returns an output stream into which data can be written.
+     * @throws AuthorizationException
+     * @throws KeyNotFoundException
+     */
+    public abstract ReadableBlobMeta getBlobMeta(String key) throws AuthorizationException, KeyNotFoundException;
+
+    /**
+     * Client facing API to set the metadata for a blob.
+     * @param key blob key name.
+     * @param meta contains ACL information.
+     * @throws AuthorizationException
+     * @throws KeyNotFoundException
+     */
+    protected abstract void setBlobMetaToExtend(String key, SettableBlobMeta meta) throws AuthorizationException, KeyNotFoundException;
+
+    /**
+     * Client facing API to delete a blob.
+     * @param key blob key name.
+     * @throws AuthorizationException
+     * @throws KeyNotFoundException
+     */
+    public abstract void deleteBlob(String key) throws AuthorizationException, KeyNotFoundException;
+
+    /**
+     * Client facing API to read a blob.
+     * @param key blob key name.
+     * @return an InputStream to read the metadata for a blob.
+     * @throws AuthorizationException
+     * @throws KeyNotFoundException
+     */
+    public abstract InputStreamWithMeta getBlob(String key) throws AuthorizationException, KeyNotFoundException;
+
+    /**
+     * @return Iterator for a list of keys currently present in the blob store.
+     */
+    public abstract Iterator<String> listKeys();
+
+    /**
+     * Client facing API to read the replication of a blob.
+     * @param key blob key name.
+     * @return int indicates the replication factor of a blob.
+     * @throws AuthorizationException
+     * @throws KeyNotFoundException
+     */
+    public abstract int getBlobReplication(String key) throws AuthorizationException, KeyNotFoundException;
+
+    /**
+     * Client facing API to update the replication of a blob.
+     * @param key blob key name.
+     * @param replication int indicates the replication factor a blob has to be set.
+     * @return int indicates the replication factor of a blob.
+     * @throws AuthorizationException
+     * @throws KeyNotFoundException
+     */
+    public abstract int updateBlobReplication(String key, int replication) throws AuthorizationException, KeyNotFoundException;
+
+    /**
+     * Client facing API to set a nimbus client.
+     * @param conf storm conf
+     * @param client NimbusClient
+     * @return indicates where the client connection has been setup.
+     */
+    public abstract boolean setClient(Map conf, NimbusClient client);
+
+    /**
+     * Creates state inside a zookeeper.
+     * Required for blobstore to write to zookeeper
+     * when Nimbus HA is turned on in order to maintain
+     * state consistency
+     * @param key
+     */
+    public abstract void createStateInZookeeper(String key);
+
+    /**
+     * Client facing API to create a blob.
+     * @param key blob key name.
+     * @param meta contains ACL information.
+     * @return AtomicOutputStream returns an output stream into which data can be written.
+     * @throws AuthorizationException
+     * @throws KeyAlreadyExistsException
+     */
+    public final AtomicOutputStream createBlob(String key, SettableBlobMeta meta) throws AuthorizationException, KeyAlreadyExistsException {
+        if (meta !=null && meta.is_set_acl()) {
+            BlobStoreAclHandler.validateSettableACLs(key, meta.get_acl());
+        }
+        return createBlobToExtend(key, meta);
+    }
+
+    /**
+     * Client facing API to set the metadata for a blob.
+     * @param key blob key name.
+     * @param meta contains ACL information.
+     * @throws AuthorizationException
+     * @throws KeyNotFoundException
+     */
+    public final void setBlobMeta(String key, SettableBlobMeta meta) throws AuthorizationException, KeyNotFoundException {
+        if (meta !=null && meta.is_set_acl()) {
+            BlobStoreAclHandler.validateSettableACLs(key, meta.get_acl());
+        }
+        setBlobMetaToExtend(key, meta);
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/blobstore/InputStreamWithMeta.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/blobstore/InputStreamWithMeta.java b/storm-client/src/jvm/org/apache/storm/blobstore/InputStreamWithMeta.java
new file mode 100644
index 0000000..6600a00
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/blobstore/InputStreamWithMeta.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.blobstore;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+public abstract class InputStreamWithMeta extends InputStream {
+    public abstract long getVersion() throws IOException;
+    public abstract long getFileLength() throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/blobstore/KeyFilter.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/blobstore/KeyFilter.java b/storm-client/src/jvm/org/apache/storm/blobstore/KeyFilter.java
new file mode 100644
index 0000000..c2d69e1
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/blobstore/KeyFilter.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.blobstore;
+
+public interface KeyFilter<R> {
+    R filter(String key);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/blobstore/NimbusBlobStore.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/blobstore/NimbusBlobStore.java b/storm-client/src/jvm/org/apache/storm/blobstore/NimbusBlobStore.java
new file mode 100644
index 0000000..5b7713d
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/blobstore/NimbusBlobStore.java
@@ -0,0 +1,428 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.blobstore;
+
+import org.apache.storm.Config;
+import org.apache.storm.generated.AuthorizationException;
+import org.apache.storm.generated.BeginDownloadResult;
+import org.apache.storm.generated.ListBlobsResult;
+import org.apache.storm.generated.ReadableBlobMeta;
+import org.apache.storm.generated.SettableBlobMeta;
+import org.apache.storm.generated.KeyAlreadyExistsException;
+import org.apache.storm.generated.KeyNotFoundException;
+import org.apache.storm.utils.ObjectReader;
+import org.apache.storm.utils.NimbusClient;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NoSuchElementException;
+
+/**
+ * NimbusBlobStore is a USER facing client API to perform
+ * basic operations such as create, update, delete and read
+ * for local and hdfs blob store.
+ *
+ * For local blob store it is also the client facing API for
+ * supervisor in order to download blobs from nimbus.
+ */
+public class NimbusBlobStore extends ClientBlobStore implements AutoCloseable {
+    private static final Logger LOG = LoggerFactory.getLogger(NimbusBlobStore.class);
+
+    public class NimbusKeyIterator implements Iterator<String> {
+        private ListBlobsResult listBlobs = null;
+        private int offset = 0;
+        private boolean eof = false;
+
+        public NimbusKeyIterator(ListBlobsResult listBlobs) {
+            this.listBlobs = listBlobs;
+            this.eof = (listBlobs.get_keys_size() == 0);
+        }
+
+        private boolean isCacheEmpty() {
+            return listBlobs.get_keys_size() <= offset;
+        }
+
+        private void readMore() throws TException {
+            if (!eof) {
+                offset = 0;
+                synchronized(client) {
+                    listBlobs = client.getClient().listBlobs(listBlobs.get_session());
+                }
+                if (listBlobs.get_keys_size() == 0) {
+                    eof = true;
+                }
+            }
+        }
+
+        @Override
+        public synchronized boolean hasNext() {
+            try {
+                if (isCacheEmpty()) {
+                    readMore();
+                }
+            } catch (TException e) {
+                throw new RuntimeException(e);
+            }
+            return !eof;
+        }
+
+        @Override
+        public synchronized String next() {
+            if (!hasNext()) {
+                throw new NoSuchElementException();
+            }
+            String ret = listBlobs.get_keys().get(offset);
+            offset++;
+            return ret;
+        }
+
+        @Override
+        public void remove() {
+            throw new UnsupportedOperationException("Delete Not Supported");
+        }
+    }
+
+    public class NimbusDownloadInputStream extends InputStreamWithMeta {
+        private BeginDownloadResult beginBlobDownload;
+        private byte[] buffer = null;
+        private int offset = 0;
+        private int end = 0;
+        private boolean eof = false;
+
+        public NimbusDownloadInputStream(BeginDownloadResult beginBlobDownload) {
+            this.beginBlobDownload = beginBlobDownload;
+        }
+
+        @Override
+        public long getVersion() throws IOException {
+            return beginBlobDownload.get_version();
+        }
+
+        @Override
+        public synchronized int read() throws IOException {
+            try {
+                if (isEmpty()) {
+                    readMore();
+                    if (eof) {
+                        return -1;
+                    }
+                }
+                int length = Math.min(1, available());
+                if (length == 0) {
+                    return -1;
+                }
+                int ret = buffer[offset];
+                offset += length;
+                return ret;
+            } catch(TException exp) {
+                throw new IOException(exp);
+            }
+        }
+
+        @Override
+        public synchronized int read(byte[] b, int off, int len) throws IOException {
+            try {
+                if (isEmpty()) {
+                    readMore();
+                    if (eof) {
+                        return -1;
+                    }
+                }
+                int length = Math.min(len, available());
+                System.arraycopy(buffer, offset, b, off, length);
+                offset += length;
+                return length;
+            } catch(TException exp) {
+                throw new IOException(exp);
+            }
+        }
+
+        private boolean isEmpty() {
+            return buffer == null || offset >= end;
+        }
+
+        private void readMore() throws TException {
+            if (!eof) {
+                ByteBuffer buff;
+                synchronized(client) {
+                    buff = client.getClient().downloadBlobChunk(beginBlobDownload.get_session());
+                }
+                buffer = buff.array();
+                offset = buff.arrayOffset() + buff.position();
+                int length = buff.remaining();
+                end = offset + length;
+                if (length == 0) {
+                    eof = true;
+                }
+            }
+        }
+
+        @Override
+        public synchronized int read(byte[] b) throws IOException {
+            return read(b, 0, b.length);
+        }
+
+        @Override
+        public synchronized int available() {
+            return buffer == null ? 0 : (end - offset);
+        }
+
+        @Override
+        public long getFileLength() {
+            return beginBlobDownload.get_data_size();
+        }
+    }
+
+    public class NimbusUploadAtomicOutputStream extends AtomicOutputStream {
+        private String session;
+        private int maxChunkSize = 4096;
+        private String key;
+
+        public NimbusUploadAtomicOutputStream(String session, int bufferSize, String key) {
+            this.session = session;
+            this.maxChunkSize = bufferSize;
+            this.key = key;
+        }
+
+        @Override
+        public void cancel() throws IOException {
+            try {
+                synchronized(client) {
+                    client.getClient().cancelBlobUpload(session);
+                }
+            } catch (TException e) {
+                throw new RuntimeException(e);
+            }
+        }
+
+        @Override
+        public void write(int b) throws IOException {
+            try {
+                synchronized(client) {
+                    client.getClient().uploadBlobChunk(session, ByteBuffer.wrap(new byte[] {(byte)b}));
+                }
+            } catch (TException e) {
+                throw new RuntimeException(e);
+            }
+        }
+
+        @Override
+        public void write(byte []b) throws IOException {
+            write(b, 0, b.length);
+        }
+
+        @Override
+        public void write(byte []b, int offset, int len) throws IOException {
+            try {
+                int end = offset + len;
+                for (int realOffset = offset; realOffset < end; realOffset += maxChunkSize) {
+                    int realLen = Math.min(end - realOffset, maxChunkSize);
+                    LOG.debug("Writing {} bytes of {} remaining",realLen,(end-realOffset));
+                    synchronized(client) {
+                        client.getClient().uploadBlobChunk(session, ByteBuffer.wrap(b, realOffset, realLen));
+                    }
+                }
+            } catch (TException e) {
+                throw new RuntimeException(e);
+            }
+        }
+
+        @Override
+        public void close() throws IOException {
+            try {
+                synchronized(client) {
+                    client.getClient().finishBlobUpload(session);
+                    client.getClient().createStateInZookeeper(key);
+                }
+            } catch (TException e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    private NimbusClient client;
+    private int bufferSize = 4096;
+
+    @Override
+    public void prepare(Map conf) {
+        this.client = NimbusClient.getConfiguredClient(conf);
+        if (conf != null) {
+            this.bufferSize = ObjectReader.getInt(conf.get(Config.STORM_BLOBSTORE_INPUTSTREAM_BUFFER_SIZE_BYTES), bufferSize);
+        }
+    }
+
+    @Override
+    protected AtomicOutputStream createBlobToExtend(String key, SettableBlobMeta meta)
+            throws AuthorizationException, KeyAlreadyExistsException {
+        try {
+            synchronized(client) {
+                return new NimbusUploadAtomicOutputStream(client.getClient().beginCreateBlob(key, meta), this.bufferSize, key);
+            }
+        } catch (AuthorizationException | KeyAlreadyExistsException exp) {
+            throw exp;
+        } catch (TException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public AtomicOutputStream updateBlob(String key)
+            throws AuthorizationException, KeyNotFoundException {
+        try {
+            synchronized(client) {
+                return new NimbusUploadAtomicOutputStream(client.getClient().beginUpdateBlob(key), this.bufferSize, key);
+            }
+        } catch (AuthorizationException | KeyNotFoundException exp) {
+            throw exp;
+        } catch (TException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public ReadableBlobMeta getBlobMeta(String key) throws AuthorizationException, KeyNotFoundException {
+        try {
+            synchronized(client) {
+                return client.getClient().getBlobMeta(key);
+            }
+        } catch (AuthorizationException | KeyNotFoundException exp) {
+            throw exp;
+        } catch (TException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    protected void setBlobMetaToExtend(String key, SettableBlobMeta meta)
+            throws AuthorizationException, KeyNotFoundException {
+        try {
+            synchronized(client) {
+                client.getClient().setBlobMeta(key, meta);
+            }
+        } catch (AuthorizationException | KeyNotFoundException exp) {
+            throw exp;
+        } catch (TException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public void deleteBlob(String key) throws AuthorizationException, KeyNotFoundException {
+        try {
+            synchronized(client) {
+                client.getClient().deleteBlob(key);
+            }
+        } catch (AuthorizationException | KeyNotFoundException exp) {
+            throw exp;
+        } catch (TException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public void createStateInZookeeper(String key) {
+        try {
+            synchronized(client) {
+                client.getClient().createStateInZookeeper(key);
+            }
+        } catch (TException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public InputStreamWithMeta getBlob(String key) throws AuthorizationException, KeyNotFoundException {
+        try {
+            synchronized(client) {
+                return new NimbusDownloadInputStream(client.getClient().beginBlobDownload(key));
+            }
+        } catch (AuthorizationException | KeyNotFoundException exp) {
+            throw exp;
+        } catch (TException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public Iterator<String> listKeys() {
+        try {
+            synchronized(client) {
+                return new NimbusKeyIterator(client.getClient().listBlobs(""));
+            }
+        } catch (TException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public int getBlobReplication(String key) throws AuthorizationException, KeyNotFoundException {
+        try {
+            return client.getClient().getBlobReplication(key);
+        } catch (AuthorizationException | KeyNotFoundException exp) {
+            throw exp;
+        } catch (TException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public int updateBlobReplication(String key, int replication) throws AuthorizationException, KeyNotFoundException {
+        try {
+            return client.getClient().updateBlobReplication(key, replication);
+        } catch (AuthorizationException | KeyNotFoundException exp) {
+            throw exp;
+        } catch (TException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public boolean setClient(Map conf, NimbusClient client) {
+        if (this.client != null) {
+            this.client.close();
+        }
+        this.client = client;
+        if (conf != null) {
+            this.bufferSize = ObjectReader.getInt(conf.get(Config.STORM_BLOBSTORE_INPUTSTREAM_BUFFER_SIZE_BYTES), bufferSize);
+        }
+        return true;
+    }
+
+    @Override
+    protected void finalize() {
+        shutdown();
+    }
+
+    @Override
+    public void shutdown() {
+        if (client != null) {
+            client.close();
+            client = null;
+        }
+    }
+
+    @Override
+    public void close() {
+        shutdown();
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/bolt/JoinBolt.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/bolt/JoinBolt.java b/storm-client/src/jvm/org/apache/storm/bolt/JoinBolt.java
new file mode 100644
index 0000000..007a958
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/bolt/JoinBolt.java
@@ -0,0 +1,595 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.bolt;
+
+
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseWindowedBolt;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.windowing.TimestampExtractor;
+import org.apache.storm.windowing.TupleWindow;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+
+public class JoinBolt extends BaseWindowedBolt {
+
+    private OutputCollector collector;
+
+    // Map[StreamName -> Map[Key -> List<Tuple>]  ]
+    HashMap<String, HashMap<Object, ArrayList<Tuple> >> hashedInputs = new HashMap<>(); // holds remaining streams
+
+    // Map[StreamName -> JoinInfo]
+    protected LinkedHashMap<String, JoinInfo> joinCriteria = new LinkedHashMap<>();
+    protected FieldSelector[] outputFields;  // specified via bolt.select() ... used in declaring Output fields
+//    protected String[] dotSeparatedOutputFieldNames; // fieldNames in x.y.z format w/o stream name, used for naming output fields
+    protected String outputStreamName;
+
+    // Use streamId, source component name OR field in tuple to distinguish incoming tuple streams
+    public enum Selector { STREAM, SOURCE }
+    protected final Selector selectorType;
+
+
+    /**
+     * Calls  JoinBolt(Selector.SOURCE, sourceId, fieldName)
+     * @param sourceId   Id of source component (spout/bolt) from which this bolt is receiving data
+     * @param fieldName  the field to use for joining the stream (x.y.z format)
+     */
+    public JoinBolt(String sourceId, String fieldName) {
+        this(Selector.SOURCE, sourceId, fieldName);
+    }
+    /**
+     *
+     * Introduces the first stream to start the join with. Equivalent SQL ...
+     *       select .... from srcOrStreamId ...
+     * @param type Specifies whether 'srcOrStreamId' refers to stream name/source component
+     * @param srcOrStreamId name of stream OR source component
+     * @param fieldName the field to use for joining the stream (x.y.z format)
+     */
+    public JoinBolt(Selector type, String srcOrStreamId, String fieldName) {
+        selectorType = type;
+
+        joinCriteria.put(srcOrStreamId, new JoinInfo(  new FieldSelector( srcOrStreamId, fieldName) ) );
+    }
+
+    /**
+     * Optional. Allows naming the output stream of this bolt. If not specified, the emits will happen on
+     * 'default' stream.
+     */
+    public JoinBolt withOutputStream(String streamName) {
+        this.outputStreamName = streamName;
+        return this;
+    }
+
+    /**
+     * Performs inner Join with the newStream.
+     *  SQL    :   from priorStream inner join newStream on newStream.field = priorStream.field1
+     *  same as:   new WindowedQueryBolt(priorStream,field1). join(newStream, field, priorStream);
+     *
+     *  Note: priorStream must be previously joined.
+     *    Valid ex:    new WindowedQueryBolt(s1,k1). join(s2,k2, s1). join(s3,k3, s2);
+     *    Invalid ex:  new WindowedQueryBolt(s1,k1). join(s3,k3, s2). join(s2,k2, s1);
+     *
+     *    @param newStream  Either stream name or name of upstream component
+     *    @param field the field on which to perform the join
+     */
+    public JoinBolt join(String newStream, String field, String priorStream) {
+        return joinCommon(newStream, field, priorStream, JoinType.INNER);
+    }
+
+    /**
+     * Performs left Join with the newStream.
+     *  SQL    :   from stream1  left join stream2  on stream2.field = stream1.field1
+     *  same as:   new  WindowedQueryBolt(stream1, field1). leftJoin(stream2, field, stream1);
+     *
+     *  Note: priorStream must be previously joined
+     *    Valid ex:    new WindowedQueryBolt(s1,k1). leftJoin(s2,k2, s1). leftJoin(s3,k3, s2);
+     *    Invalid ex:  new WindowedQueryBolt(s1,k1). leftJoin(s3,k3, s2). leftJoin(s2,k2, s1);
+     *
+     *    @param newStream  Either a name of a stream or an upstream component
+     *    @param field the field on which to perform the join
+     */
+    public JoinBolt leftJoin(String newStream, String field, String priorStream) {
+        return joinCommon(newStream, field, priorStream, JoinType.LEFT);
+    }
+
+    private JoinBolt joinCommon(String newStream, String fieldDescriptor, String priorStream, JoinType joinType) {
+        if (hashedInputs.containsKey(newStream)) {
+            throw new IllegalArgumentException("'" + newStream + "' is already part of join. Cannot join with it more than once.");
+        }
+        hashedInputs.put(newStream, new HashMap<Object, ArrayList<Tuple>>());
+        JoinInfo joinInfo = joinCriteria.get(priorStream);
+        if( joinInfo==null )
+            throw new IllegalArgumentException("Stream '" + priorStream + "' was not previously declared");
+
+        FieldSelector field = new FieldSelector(newStream, fieldDescriptor);
+        joinCriteria.put(newStream, new JoinInfo(field, priorStream, joinInfo, joinType) );
+        return this;
+    }
+
+    /**
+     * Specify projection fields. i.e. Specifies the fields to include in the output.
+     *      e.g: .select("field1, stream2:field2, field3")
+     * Nested Key names are supported for nested types:
+     *      e.g: .select("outerKey1.innerKey1, outerKey1.innerKey2, stream3:outerKey2.innerKey3)"
+     * Inner types (non leaf) must be Map<> in order to support nested lookup using this dot notation
+     * This selected fields implicitly declare the output fieldNames for the bolt based.
+     * @param commaSeparatedKeys
+     * @return
+     */
+    public JoinBolt select(String commaSeparatedKeys) {
+        String[] fieldNames = commaSeparatedKeys.split(",");
+
+        outputFields = new FieldSelector[fieldNames.length];
+        for (int i = 0; i < fieldNames.length; i++) {
+            outputFields[i] = new FieldSelector(fieldNames[i]);
+        }
+        return this;
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        String[] outputFieldNames = new String[outputFields.length];
+        for( int i=0; i<outputFields.length; ++i ) {
+            outputFieldNames[i] = outputFields[i].getOutputName() ;
+        }
+        if (outputStreamName!=null) {
+            declarer.declareStream(outputStreamName, new Fields(outputFieldNames));
+        } else {
+            declarer.declare(new Fields(outputFieldNames));
+        }
+    }
+
+    @Override
+    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+        this.collector = collector;
+        // initialize the hashedInputs data structure
+        int i=0;
+        for ( String stream : joinCriteria.keySet() ) {
+            if(i>0) {
+                hashedInputs.put(stream, new HashMap<Object, ArrayList<Tuple>>());
+            }
+            ++i;
+        }
+        if(outputFields ==null) {
+            throw new IllegalArgumentException("Must specify output fields via .select() method.");
+        }
+    }
+
+    @Override
+    public void execute(TupleWindow inputWindow) {
+        // 1) Perform Join
+        List<Tuple> currentWindow = inputWindow.get();
+        JoinAccumulator joinResult = hashJoin(currentWindow);
+
+        // 2) Emit results
+        for (ResultRecord resultRecord : joinResult.getRecords()) {
+            ArrayList<Object> outputTuple = resultRecord.getOutputFields();
+            if ( outputStreamName==null )
+                // explicit anchoring emits to corresponding input tuples only, as default window anchoring will anchor them to all tuples in window
+                collector.emit( resultRecord.tupleList, outputTuple );
+            else
+                // explicitly anchor emits to corresponding input tuples only, as default window anchoring will anchor them to all tuples in window
+                collector.emit( outputStreamName, resultRecord.tupleList, outputTuple );
+        }
+    }
+
+    private void clearHashedInputs() {
+        for (HashMap<Object, ArrayList<Tuple>> mappings : hashedInputs.values()) {
+            mappings.clear();
+        }
+    }
+
+    protected JoinAccumulator hashJoin(List<Tuple> tuples) {
+        clearHashedInputs();
+
+        JoinAccumulator probe = new JoinAccumulator();
+
+        // 1) Build phase - Segregate tuples in the Window into streams.
+        //    First stream's tuples go into probe, rest into HashMaps in hashedInputs
+        String firstStream = joinCriteria.keySet().iterator().next();
+        for (Tuple tuple : tuples) {
+            String streamId = getStreamSelector(tuple);
+            if ( ! streamId.equals(firstStream) ) {
+                Object field = getJoinField(streamId, tuple);
+                ArrayList<Tuple> recs = hashedInputs.get(streamId).get(field);
+                if(recs == null) {
+                    recs = new ArrayList<Tuple>();
+                    hashedInputs.get(streamId).put(field, recs);
+                }
+                recs.add(tuple);
+
+            }  else {
+                ResultRecord probeRecord = new ResultRecord(tuple, joinCriteria.size() == 1);
+                probe.insert( probeRecord );  // first stream's data goes into the probe
+            }
+        }
+
+        // 2) Join the streams in order of streamJoinOrder
+        int i=0;
+        for (String streamName : joinCriteria.keySet() ) {
+            boolean finalJoin = (i==joinCriteria.size()-1);
+            if(i>0) {
+                probe = doJoin(probe, hashedInputs.get(streamName), joinCriteria.get(streamName), finalJoin);
+            }
+            ++i;
+        }
+
+
+        return probe;
+    }
+
+    // Dispatches to the right join method (inner/left/right/outer) based on the joinInfo.joinType
+    protected JoinAccumulator doJoin(JoinAccumulator probe, HashMap<Object, ArrayList<Tuple>> buildInput, JoinInfo joinInfo, boolean finalJoin) {
+        final JoinType joinType = joinInfo.getJoinType();
+        switch ( joinType ) {
+            case INNER:
+                return doInnerJoin(probe, buildInput, joinInfo, finalJoin);
+            case LEFT:
+                return doLeftJoin(probe, buildInput, joinInfo, finalJoin);
+            case RIGHT:
+            case OUTER:
+            default:
+                throw new RuntimeException("Unsupported join type : " + joinType.name() );
+        }
+    }
+
+    // inner join - core implementation
+    protected JoinAccumulator doInnerJoin(JoinAccumulator probe, Map<Object, ArrayList<Tuple>> buildInput, JoinInfo joinInfo, boolean finalJoin) {
+        String[] probeKeyName = joinInfo.getOtherField();
+        JoinAccumulator result = new JoinAccumulator();
+        FieldSelector fieldSelector = new FieldSelector(joinInfo.other.getStreamName(), probeKeyName);
+        for (ResultRecord rec : probe.getRecords()) {
+            Object probeKey = rec.getField(fieldSelector);
+            if (probeKey!=null) {
+                ArrayList<Tuple> matchingBuildRecs = buildInput.get(probeKey);
+                if(matchingBuildRecs!=null) {
+                    for (Tuple matchingRec : matchingBuildRecs) {
+                        ResultRecord mergedRecord = new ResultRecord(rec, matchingRec, finalJoin);
+                        result.insert(mergedRecord);
+                    }
+                }
+            }
+        }
+        return result;
+    }
+
+    // left join - core implementation
+    protected JoinAccumulator doLeftJoin(JoinAccumulator probe, Map<Object, ArrayList<Tuple>> buildInput, JoinInfo joinInfo, boolean finalJoin) {
+        String[] probeKeyName = joinInfo.getOtherField();
+        JoinAccumulator result = new JoinAccumulator();
+        FieldSelector fieldSelector = new FieldSelector(joinInfo.other.getStreamName(), probeKeyName);
+        for (ResultRecord rec : probe.getRecords()) {
+            Object probeKey = rec.getField(fieldSelector);
+            if (probeKey!=null) {
+                ArrayList<Tuple> matchingBuildRecs = buildInput.get(probeKey); // ok if its return null
+                if (matchingBuildRecs!=null && !matchingBuildRecs.isEmpty() ) {
+                    for (Tuple matchingRec : matchingBuildRecs) {
+                        ResultRecord mergedRecord = new ResultRecord(rec, matchingRec, finalJoin);
+                        result.insert(mergedRecord);
+                    }
+                } else {
+                    ResultRecord mergedRecord = new ResultRecord(rec, null, finalJoin);
+                    result.insert(mergedRecord);
+                }
+
+            }
+        }
+        return result;
+    }
+
+
+    // Identify the join field for the stream, and look it up in 'tuple'. field can be nested field:  outerKey.innerKey
+    private Object getJoinField(String streamId, Tuple tuple) {
+        JoinInfo ji = joinCriteria.get(streamId);
+        if(ji==null) {
+            throw new RuntimeException("Join information for '" + streamId + "' not found. Check the join clauses.");
+        }
+        return lookupField(ji.getJoinField(), tuple);
+    }
+
+    // Returns either the source component name or the stream name for the tuple
+    private String getStreamSelector(Tuple ti) {
+        switch (selectorType) {
+            case STREAM:
+                return ti.getSourceStreamId();
+            case SOURCE:
+                return ti.getSourceComponent();
+            default:
+                throw new RuntimeException(selectorType + " stream selector type not yet supported");
+        }
+    }
+
+
+    protected enum JoinType {INNER, LEFT, RIGHT, OUTER}
+
+    /** Describes how to join the other stream with the current stream */
+    protected static class JoinInfo implements Serializable {
+        final static long serialVersionUID = 1L;
+
+        private JoinType joinType;        // nature of join
+        private FieldSelector field;           // field for the current stream
+        private FieldSelector other;      // field for the other (2nd) stream
+
+
+        public JoinInfo(FieldSelector field) {
+            this.joinType = null;
+            this.field = field;
+            this.other = null;
+        }
+        public JoinInfo(FieldSelector field, String otherStream, JoinInfo otherStreamJoinInfo,  JoinType joinType) {
+            this.joinType = joinType;
+            this.field = field;
+            this.other = new FieldSelector(otherStream, otherStreamJoinInfo.field.getOutputName() );
+        }
+
+        public FieldSelector getJoinField() {
+            return field;
+        }
+
+        public String getOtherStream() {
+            return other.getStreamName();
+        }
+
+        public String[] getOtherField() {
+            return other.getField();
+        }
+
+        public JoinType getJoinType() {
+            return joinType;
+        }
+
+    } // class JoinInfo
+
+    // Join helper to concat fields to the record
+    protected class ResultRecord {
+
+        ArrayList<Tuple> tupleList = new ArrayList<>(); // contains one Tuple per Stream being joined
+        ArrayList<Object> outFields = null; // refs to fields that will be part of output fields
+
+        // 'generateOutputFields' enables us to avoid projection unless it is the final stream being joined
+        public ResultRecord(Tuple tuple, boolean generateOutputFields) {
+            tupleList.add(tuple);
+            if(generateOutputFields) {
+                outFields = doProjection(tupleList, outputFields);
+            }
+        }
+
+        public ResultRecord(ResultRecord lhs, Tuple rhs, boolean generateOutputFields) {
+            if(lhs!=null)
+                tupleList.addAll(lhs.tupleList);
+            if(rhs!=null)
+                tupleList.add(rhs);
+            if(generateOutputFields) {
+                outFields = doProjection(tupleList, outputFields);
+            }
+        }
+
+        public ArrayList<Object> getOutputFields() {
+            return outFields;
+        }
+
+
+        // 'stream' cannot be null,
+        public Object getField(FieldSelector fieldSelector) {
+            for (Tuple tuple : tupleList) {
+                Object result = lookupField(fieldSelector, tuple);
+                if (result!=null)
+                    return result;
+            }
+            return null;
+        }
+    }
+
+    protected class JoinAccumulator {
+        ArrayList<ResultRecord> records = new ArrayList<>();
+
+        public void insert(ResultRecord tuple) {
+            records.add( tuple );
+        }
+
+        public Collection<ResultRecord> getRecords() {
+            return records;
+        }
+    }
+
+    // Performs projection on the tuples based on 'projectionFields'
+    protected ArrayList<Object> doProjection(ArrayList<Tuple> tuples, FieldSelector[] projectionFields) {
+        ArrayList<Object> result = new ArrayList<>(projectionFields.length);
+        // Todo: optimize this computation... perhaps inner loop can be outside to avoid rescanning tuples
+        for ( int i = 0; i < projectionFields.length; i++ ) {
+            boolean missingField = true;
+            for ( Tuple tuple : tuples ) {
+                Object field = lookupField(projectionFields[i], tuple ) ;
+                if (field != null) {
+                    result.add(field);
+                    missingField=false;
+                    break;
+                }
+            }
+            if(missingField) { // add a null for missing fields (usually in case of outer joins)
+                result.add(null);
+            }
+        }
+        return result;
+    }
+
+    protected static class FieldSelector implements Serializable {
+        String streamName;    // can be null;
+        String[] field;       // nested field "x.y.z"  becomes => String["x","y","z"]
+        String outputName;    // either "stream1:x.y.z" or "x.y.z" depending on whether stream name is present.
+
+        public FieldSelector(String fieldDescriptor)  {  // sample fieldDescriptor = "stream1:x.y.z"
+            int pos = fieldDescriptor.indexOf(':');
+
+            if (pos>0) {  // stream name is specified
+                streamName = fieldDescriptor.substring(0,pos).trim();
+                outputName = fieldDescriptor.trim();
+                field =  fieldDescriptor.substring(pos+1, fieldDescriptor.length()).split("\\.");
+                return;
+            }
+
+            // stream name unspecified
+            streamName = null;
+            if(pos==0) {
+                outputName = fieldDescriptor.substring(1, fieldDescriptor.length() ).trim();
+
+            } else if (pos<0) {
+                outputName = fieldDescriptor.trim();
+            }
+            field =  outputName.split("\\.");
+        }
+
+        /**
+         * @param stream name of stream
+         * @param fieldDescriptor  Simple fieldDescriptor like "x.y.z" and w/o a 'stream1:' stream qualifier.
+         */
+        public FieldSelector(String stream, String fieldDescriptor)  {
+            this(fieldDescriptor);
+            if(fieldDescriptor.indexOf(":")>=0) {
+                throw new IllegalArgumentException("Not expecting stream qualifier ':' in '" + fieldDescriptor
+                        + "'. Stream name '" + stream +  "' is implicit in this context");
+            }
+            this.streamName = stream;
+        }
+
+        public FieldSelector(String stream, String[] field)  {
+            this( stream, String.join(".", field) );
+        }
+
+
+        public String getStreamName() {
+            return streamName;
+        }
+
+        public String[] getField() {
+            return field;
+        }
+
+        public String getOutputName() {
+            return toString();
+        }
+
+        @Override
+        public String toString() {
+            return outputName;
+        }
+    }
+
+    // Extract the field from tuple. Field may be nested field (x.y.z)
+    protected Object lookupField(FieldSelector fieldSelector, Tuple tuple) {
+
+        // very stream name matches, it stream name was specified
+        if ( fieldSelector.streamName!=null &&
+                !fieldSelector.streamName.equalsIgnoreCase( getStreamSelector(tuple) ) ) {
+            return null;
+        }
+
+        Object curr = null;
+        for (int i=0; i < fieldSelector.field.length; i++) {
+            if (i==0) {
+                if (tuple.contains(fieldSelector.field[i]) )
+                    curr = tuple.getValueByField(fieldSelector.field[i]);
+                else
+                    return null;
+            }  else  {
+                curr = ((Map) curr).get(fieldSelector.field[i]);
+                if (curr==null)
+                    return null;
+            }
+        }
+        return curr;
+    }
+
+    // Boilerplate overrides to cast result from base type to JoinBolt, so user doesn't have to
+    // down cast when invoking these methods
+
+    @Override
+    public JoinBolt withWindow(Count windowLength, Count slidingInterval) {
+        return (JoinBolt) super.withWindow(windowLength, slidingInterval);
+    }
+
+    @Override
+    public JoinBolt withWindow(Count windowLength, Duration slidingInterval) {
+        return (JoinBolt) super.withWindow(windowLength, slidingInterval);
+    }
+
+    @Override
+    public JoinBolt withWindow(Duration windowLength, Count slidingInterval) {
+        return (JoinBolt) super.withWindow(windowLength, slidingInterval);
+    }
+
+    @Override
+    public JoinBolt withWindow(Duration windowLength, Duration slidingInterval) {
+        return (JoinBolt) super.withWindow(windowLength, slidingInterval);
+    }
+
+    @Override
+    public JoinBolt withWindow(Count windowLength) {
+        return (JoinBolt) super.withWindow(windowLength);
+    }
+
+    @Override
+    public JoinBolt withWindow(Duration windowLength) {
+        return (JoinBolt) super.withWindow(windowLength);
+    }
+
+    @Override
+    public JoinBolt withTumblingWindow(Count count) {
+        return (JoinBolt) super.withTumblingWindow(count);
+    }
+
+    @Override
+    public JoinBolt withTumblingWindow(Duration duration) {
+        return (JoinBolt) super.withTumblingWindow(duration);
+    }
+
+    @Override
+    public JoinBolt withTimestampField(String fieldName) {
+        return (JoinBolt) super.withTimestampField(fieldName);
+    }
+
+    @Override
+    public JoinBolt withTimestampExtractor(TimestampExtractor timestampExtractor) {
+        return (JoinBolt) super.withTimestampExtractor(timestampExtractor);
+    }
+
+    @Override
+    public JoinBolt withLateTupleStream(String streamId) {
+        return (JoinBolt) super.withLateTupleStream(streamId);
+    }
+
+    @Override
+    public BaseWindowedBolt withLag(Duration duration) {
+        return (JoinBolt) super.withLag(duration);
+    }
+
+    @Override
+    public BaseWindowedBolt withWatermarkInterval(Duration interval) {
+        return (JoinBolt) super.withWatermarkInterval(interval);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/callback/DefaultWatcherCallBack.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/callback/DefaultWatcherCallBack.java b/storm-client/src/jvm/org/apache/storm/callback/DefaultWatcherCallBack.java
new file mode 100644
index 0000000..043dd0c
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/callback/DefaultWatcherCallBack.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.callback;
+
+import org.apache.storm.zookeeper.ZkEventTypes;
+import org.apache.storm.zookeeper.ZkKeeperStates;
+import org.apache.zookeeper.Watcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DefaultWatcherCallBack implements WatcherCallBack {
+
+    private static Logger LOG = LoggerFactory.getLogger(DefaultWatcherCallBack.class);
+
+    @Override
+    public void execute(Watcher.Event.KeeperState state, Watcher.Event.EventType type, String path) {
+        LOG.debug("Zookeeper state update:  {}, {}, {}", ZkKeeperStates.getStateName(state), ZkEventTypes.getTypeName(type), path);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/callback/WatcherCallBack.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/callback/WatcherCallBack.java b/storm-client/src/jvm/org/apache/storm/callback/WatcherCallBack.java
new file mode 100644
index 0000000..41a50ec
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/callback/WatcherCallBack.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.callback;
+
+import org.apache.zookeeper.Watcher;
+
+public interface WatcherCallBack {
+    public void execute(Watcher.Event.KeeperState state, Watcher.Event.EventType type, String path);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/callback/ZKStateChangedCallback.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/callback/ZKStateChangedCallback.java b/storm-client/src/jvm/org/apache/storm/callback/ZKStateChangedCallback.java
new file mode 100644
index 0000000..75b0e99
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/callback/ZKStateChangedCallback.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.callback;
+
+import org.apache.zookeeper.Watcher;
+
+public interface ZKStateChangedCallback {
+    public void changed(Watcher.Event.EventType type, String path);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/clojure/ClojureBolt.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/clojure/ClojureBolt.java b/storm-client/src/jvm/org/apache/storm/clojure/ClojureBolt.java
new file mode 100644
index 0000000..60300e2
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/clojure/ClojureBolt.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.clojure;
+
+import org.apache.storm.coordination.CoordinatedBolt.FinishedCallback;
+import org.apache.storm.generated.StreamInfo;
+import org.apache.storm.task.IBolt;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.IRichBolt;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import clojure.lang.IFn;
+import clojure.lang.PersistentArrayMap;
+import clojure.lang.Keyword;
+import clojure.lang.Symbol;
+import clojure.lang.RT;
+import org.apache.storm.utils.Utils;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+
+public class ClojureBolt implements IRichBolt, FinishedCallback {
+    Map<String, StreamInfo> _fields;
+    List<String> _fnSpec;
+    List<String> _confSpec;
+    List<Object> _params;
+    
+    IBolt _bolt;
+    
+    public ClojureBolt(List fnSpec, List confSpec, List<Object> params, Map<String, StreamInfo> fields) {
+        _fnSpec = fnSpec;
+        _confSpec = confSpec;
+        _params = params;
+        _fields = fields;
+    }
+
+    @Override
+    public void prepare(final Map stormConf, final TopologyContext context, final OutputCollector collector) {
+        IFn hof = Utils.loadClojureFn(_fnSpec.get(0), _fnSpec.get(1));
+        try {
+            IFn preparer = (IFn) hof.applyTo(RT.seq(_params));
+            final Map<Keyword,Object> collectorMap = new PersistentArrayMap( new Object[] {
+                Keyword.intern(Symbol.create("output-collector")), collector,
+                Keyword.intern(Symbol.create("context")), context});
+            List<Object> args = new ArrayList<Object>() {{
+                add(stormConf);
+                add(context);
+                add(collectorMap);
+            }};
+            
+            _bolt = (IBolt) preparer.applyTo(RT.seq(args));
+            //this is kind of unnecessary for clojure
+            try {
+                _bolt.prepare(stormConf, context, collector);
+            } catch(AbstractMethodError ame) {
+                
+            }
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public void execute(Tuple input) {
+        _bolt.execute(input);
+    }
+
+    @Override
+    public void cleanup() {
+            try {
+                _bolt.cleanup();
+            } catch(AbstractMethodError ame) {
+                
+            }
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        for(String stream: _fields.keySet()) {
+            StreamInfo info = _fields.get(stream);
+            declarer.declareStream(stream, info.is_direct(), new Fields(info.get_output_fields()));
+        }
+    }
+
+    @Override
+    public void finishedId(Object id) {
+        if(_bolt instanceof FinishedCallback) {
+            ((FinishedCallback) _bolt).finishedId(id);
+        }
+    }
+
+    @Override
+    public Map<String, Object> getComponentConfiguration() {
+        IFn hof = Utils.loadClojureFn(_confSpec.get(0), _confSpec.get(1));
+        try {
+            return (Map) hof.applyTo(RT.seq(_params));
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/clojure/ClojureSpout.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/clojure/ClojureSpout.java b/storm-client/src/jvm/org/apache/storm/clojure/ClojureSpout.java
new file mode 100644
index 0000000..372b306
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/clojure/ClojureSpout.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.clojure;
+
+import org.apache.storm.generated.StreamInfo;
+import org.apache.storm.spout.ISpout;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.IRichSpout;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.utils.Utils;
+import clojure.lang.IFn;
+import clojure.lang.PersistentArrayMap;
+import clojure.lang.Keyword;
+import clojure.lang.Symbol;
+import clojure.lang.RT;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class ClojureSpout implements IRichSpout {
+    Map<String, StreamInfo> _fields;
+    List<String> _fnSpec;
+    List<String> _confSpec;
+    List<Object> _params;
+    
+    ISpout _spout;
+    
+    public ClojureSpout(List fnSpec, List confSpec, List<Object> params, Map<String, StreamInfo> fields) {
+        _fnSpec = fnSpec;
+        _confSpec = confSpec;
+        _params = params;
+        _fields = fields;
+    }
+    
+
+    @Override
+    public void open(final Map conf, final TopologyContext context, final SpoutOutputCollector collector) {
+        IFn hof = Utils.loadClojureFn(_fnSpec.get(0), _fnSpec.get(1));
+        try {
+            IFn preparer = (IFn) hof.applyTo(RT.seq(_params));
+            final Map<Keyword,Object> collectorMap = new PersistentArrayMap( new Object[] {
+                Keyword.intern(Symbol.create("output-collector")), collector,
+                Keyword.intern(Symbol.create("context")), context});
+            List<Object> args = new ArrayList<Object>() {{
+                add(conf);
+                add(context);
+                add(collectorMap);
+            }};
+            
+            _spout = (ISpout) preparer.applyTo(RT.seq(args));
+            //this is kind of unnecessary for clojure
+            try {
+                _spout.open(conf, context, collector);
+            } catch(AbstractMethodError ame) {
+                
+            }
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public void close() {
+        try {
+            _spout.close();
+        } catch(AbstractMethodError ame) {
+                
+        }
+    }
+
+    @Override
+    public void nextTuple() {
+        try {
+            _spout.nextTuple();
+        } catch(AbstractMethodError ame) {
+                
+        }
+
+    }
+
+    @Override
+    public void ack(Object msgId) {
+        try {
+            _spout.ack(msgId);
+        } catch(AbstractMethodError ame) {
+                
+        }
+
+    }
+
+    @Override
+    public void fail(Object msgId) {
+        try {
+            _spout.fail(msgId);
+        } catch(AbstractMethodError ame) {
+                
+        }
+
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        for(String stream: _fields.keySet()) {
+            StreamInfo info = _fields.get(stream);
+            declarer.declareStream(stream, info.is_direct(), new Fields(info.get_output_fields()));
+        }
+    }
+    
+    @Override
+    public Map<String, Object> getComponentConfiguration() {
+        IFn hof = Utils.loadClojureFn(_confSpec.get(0), _confSpec.get(1));
+        try {
+            return (Map) hof.applyTo(RT.seq(_params));
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public void activate() {
+        try {
+            _spout.activate();
+        } catch(AbstractMethodError ame) {
+                
+        }
+    }
+
+    @Override
+    public void deactivate() {
+        try {
+            _spout.deactivate();
+        } catch(AbstractMethodError ame) {
+                
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/clojure/RichShellBolt.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/clojure/RichShellBolt.java b/storm-client/src/jvm/org/apache/storm/clojure/RichShellBolt.java
new file mode 100644
index 0000000..6de5637
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/clojure/RichShellBolt.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.clojure;
+
+import org.apache.storm.generated.StreamInfo;
+import org.apache.storm.task.ShellBolt;
+import org.apache.storm.topology.IRichBolt;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Fields;
+import java.util.Map;
+
+public class RichShellBolt extends ShellBolt implements IRichBolt {
+    private Map<String, StreamInfo> _outputs;
+    
+    public RichShellBolt(String[] command, Map<String, StreamInfo> outputs) {
+        super(command);
+        _outputs = outputs;
+    }
+    
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        for(String stream: _outputs.keySet()) {
+            StreamInfo def = _outputs.get(stream);
+            if(def.is_direct()) {
+                declarer.declareStream(stream, true, new Fields(def.get_output_fields()));
+            } else {
+                declarer.declareStream(stream, new Fields(def.get_output_fields()));                
+            }
+        }
+    }
+
+    @Override
+    public Map<String, Object> getComponentConfiguration() {
+        return null;
+    }    
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/clojure/RichShellSpout.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/clojure/RichShellSpout.java b/storm-client/src/jvm/org/apache/storm/clojure/RichShellSpout.java
new file mode 100644
index 0000000..9fb7e73
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/clojure/RichShellSpout.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.clojure;
+
+import org.apache.storm.generated.StreamInfo;
+import org.apache.storm.spout.ShellSpout;
+import org.apache.storm.topology.IRichSpout;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Fields;
+import java.util.Map;
+
+public class RichShellSpout extends ShellSpout implements IRichSpout {
+    private Map<String, StreamInfo> _outputs;
+
+    public RichShellSpout(String[] command, Map<String, StreamInfo> outputs) {
+        super(command);
+        _outputs = outputs;
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        for(String stream: _outputs.keySet()) {
+            StreamInfo def = _outputs.get(stream);
+            if(def.is_direct()) {
+                declarer.declareStream(stream, true, new Fields(def.get_output_fields()));
+            } else {
+                declarer.declareStream(stream, new Fields(def.get_output_fields()));
+            }
+        }
+    }
+
+    @Override
+    public Map<String, Object> getComponentConfiguration() {
+        return null;
+    }
+}