You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2017/04/06 17:33:34 UTC
[45/52] [partial] storm git commit: STORM-2441 Break down
'storm-core' to extract client (worker) artifacts
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/blobstore/BlobStoreAclHandler.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/blobstore/BlobStoreAclHandler.java b/storm-client/src/jvm/org/apache/storm/blobstore/BlobStoreAclHandler.java
new file mode 100644
index 0000000..5b3866d
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/blobstore/BlobStoreAclHandler.java
@@ -0,0 +1,409 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.blobstore;
+
+import org.apache.storm.Config;
+import org.apache.storm.generated.AccessControl;
+import org.apache.storm.generated.AccessControlType;
+import org.apache.storm.generated.AuthorizationException;
+import org.apache.storm.generated.SettableBlobMeta;
+import org.apache.storm.security.auth.AuthUtils;
+import org.apache.storm.security.auth.IPrincipalToLocal;
+import org.apache.storm.security.auth.NimbusPrincipal;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.security.auth.Subject;
+import java.security.Principal;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Provides common handling of acls for Blobstores.
+ * Also contains some static utility functions related to Blobstores.
+ */
+public class BlobStoreAclHandler {
+ public static final Logger LOG = LoggerFactory.getLogger(BlobStoreAclHandler.class);
+ private final IPrincipalToLocal _ptol;
+
+ public static final int READ = 0x01;
+ public static final int WRITE = 0x02;
+ public static final int ADMIN = 0x04;
+ public static final List<AccessControl> WORLD_EVERYTHING =
+ Arrays.asList(new AccessControl(AccessControlType.OTHER, READ | WRITE | ADMIN));
+ public static final List<AccessControl> DEFAULT = new ArrayList<AccessControl>();
+ private Set<String> _supervisors;
+ private Set<String> _admins;
+ private boolean doAclValidation;
+
+ public BlobStoreAclHandler(Map conf) {
+ _ptol = AuthUtils.GetPrincipalToLocalPlugin(conf);
+ _supervisors = new HashSet<String>();
+ _admins = new HashSet<String>();
+ if (conf.containsKey(Config.NIMBUS_SUPERVISOR_USERS)) {
+ _supervisors.addAll((List<String>)conf.get(Config.NIMBUS_SUPERVISOR_USERS));
+ }
+ if (conf.containsKey(Config.NIMBUS_ADMINS)) {
+ _admins.addAll((List<String>)conf.get(Config.NIMBUS_ADMINS));
+ }
+ if (conf.containsKey(Config.STORM_BLOBSTORE_ACL_VALIDATION_ENABLED)) {
+ doAclValidation = (boolean)conf.get(Config.STORM_BLOBSTORE_ACL_VALIDATION_ENABLED);
+ }
+ }
+
+ private static AccessControlType parseACLType(String type) {
+ if ("other".equalsIgnoreCase(type) || "o".equalsIgnoreCase(type)) {
+ return AccessControlType.OTHER;
+ } else if ("user".equalsIgnoreCase(type) || "u".equalsIgnoreCase(type)) {
+ return AccessControlType.USER;
+ }
+ throw new IllegalArgumentException(type+" is not a valid access control type");
+ }
+
+ private static int parseAccess(String access) {
+ int ret = 0;
+ for (char c: access.toCharArray()) {
+ if ('r' == c) {
+ ret = ret | READ;
+ } else if ('w' == c) {
+ ret = ret | WRITE;
+ } else if ('a' == c) {
+ ret = ret | ADMIN;
+ } else if ('-' == c) {
+ //ignored
+ } else {
+ throw new IllegalArgumentException("");
+ }
+ }
+ return ret;
+ }
+
+ public static AccessControl parseAccessControl(String str) {
+ String[] parts = str.split(":");
+ String type = "other";
+ String name = "";
+ String access = "-";
+ if (parts.length > 3) {
+ throw new IllegalArgumentException("Don't know how to parse "+str+" into an ACL value");
+ } else if (parts.length == 1) {
+ type = "other";
+ name = "";
+ access = parts[0];
+ } else if (parts.length == 2) {
+ type = "user";
+ name = parts[0];
+ access = parts[1];
+ } else if (parts.length == 3) {
+ type = parts[0];
+ name = parts[1];
+ access = parts[2];
+ }
+ AccessControl ret = new AccessControl();
+ ret.set_type(parseACLType(type));
+ ret.set_name(name);
+ ret.set_access(parseAccess(access));
+ return ret;
+ }
+
+ private static String accessToString(int access) {
+ StringBuilder ret = new StringBuilder();
+ ret.append(((access & READ) > 0) ? "r" : "-");
+ ret.append(((access & WRITE) > 0) ? "w" : "-");
+ ret.append(((access & ADMIN) > 0) ? "a" : "-");
+ return ret.toString();
+ }
+
+ public static String accessControlToString(AccessControl ac) {
+ StringBuilder ret = new StringBuilder();
+ switch(ac.get_type()) {
+ case OTHER:
+ ret.append("o");
+ break;
+ case USER:
+ ret.append("u");
+ break;
+ default:
+ throw new IllegalArgumentException("Don't know what a type of "+ac.get_type()+" means ");
+ }
+ ret.append(":");
+ if (ac.is_set_name()) {
+ ret.append(ac.get_name());
+ }
+ ret.append(":");
+ ret.append(accessToString(ac.get_access()));
+ return ret.toString();
+ }
+
+ public static void validateSettableACLs(String key, List<AccessControl> acls) throws AuthorizationException {
+ Set<String> aclUsers = new HashSet<>();
+ List<String> duplicateUsers = new ArrayList<>();
+ for (AccessControl acl : acls) {
+ String aclUser = acl.get_name();
+ if (!StringUtils.isEmpty(aclUser) && !aclUsers.add(aclUser)) {
+ LOG.error("'{}' user can't appear more than once in the ACLs", aclUser);
+ duplicateUsers.add(aclUser);
+ }
+ }
+ if (duplicateUsers.size() > 0) {
+ String errorMessage = "user " + Arrays.toString(duplicateUsers.toArray())
+ + " can't appear more than once in the ACLs for key [" + key +"].";
+ throw new AuthorizationException(errorMessage);
+ }
+ }
+
+ private Set<String> constructUserFromPrincipals(Subject who) {
+ Set<String> user = new HashSet<String>();
+ if (who != null) {
+ for (Principal p : who.getPrincipals()) {
+ user.add(_ptol.toLocal(p));
+ }
+ }
+ return user;
+ }
+
+ private boolean isAdmin(Subject who) {
+ Set<String> user = constructUserFromPrincipals(who);
+ for (String u : user) {
+ if (_admins.contains(u)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private boolean isReadOperation(int operation) {
+ if (operation == 1) {
+ return true;
+ }
+ return false;
+ }
+
+ private boolean isSupervisor(Subject who, int operation) {
+ Set<String> user = constructUserFromPrincipals(who);
+ if (isReadOperation(operation)) {
+ for (String u : user) {
+ if (_supervisors.contains(u)) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+ private boolean isNimbus(Subject who) {
+ Set<Principal> principals;
+ boolean isNimbusInstance = false;
+ if (who != null) {
+ principals = who.getPrincipals();
+ for (Principal principal : principals) {
+ if (principal instanceof NimbusPrincipal) {
+ isNimbusInstance = true;
+ }
+ }
+ }
+ return isNimbusInstance;
+ }
+
+ public boolean checkForValidUsers(Subject who, int mask) {
+ return isNimbus(who) || isAdmin(who) || isSupervisor(who,mask);
+ }
+
+ /**
+ * The user should be able to see the metadata if and only if they have any of READ, WRITE, or ADMIN
+ */
+ public void validateUserCanReadMeta(List<AccessControl> acl, Subject who, String key) throws AuthorizationException {
+ hasAnyPermissions(acl, (READ|WRITE|ADMIN), who, key);
+ }
+
+ /**
+ * Validates if the user has any of the permissions
+ * mentioned in the mask.
+ * @param acl ACL for the key.
+ * @param mask mask holds the cumulative value of
+ * READ = 1, WRITE = 2 or ADMIN = 4 permissions.
+ * mask = 1 implies READ privilege.
+ * mask = 5 implies READ and ADMIN privileges.
+ * @param who Is the user against whom the permissions
+ * are validated for a key using the ACL and the mask.
+ * @param key Key used to identify the blob.
+ * @throws AuthorizationException
+ */
+ public void hasAnyPermissions(List<AccessControl> acl, int mask, Subject who, String key) throws AuthorizationException {
+ if (!doAclValidation) {
+ return;
+ }
+ Set<String> user = constructUserFromPrincipals(who);
+ LOG.debug("user {}", user);
+ if (checkForValidUsers(who, mask)) {
+ return;
+ }
+ for (AccessControl ac : acl) {
+ int allowed = getAllowed(ac, user);
+ LOG.debug(" user: {} allowed: {} key: {}", user, allowed, key);
+ if ((allowed & mask) > 0) {
+ return;
+ }
+ }
+ throw new AuthorizationException(
+ user + " does not have access to " + key);
+ }
+
+ /**
+ * Validates if the user has at least the set of permissions
+ * mentioned in the mask.
+ * @param acl ACL for the key.
+ * @param mask mask holds the cumulative value of
+ * READ = 1, WRITE = 2 or ADMIN = 4 permissions.
+ * mask = 1 implies READ privilege.
+ * mask = 5 implies READ and ADMIN privileges.
+ * @param who Is the user against whom the permissions
+ * are validated for a key using the ACL and the mask.
+ * @param key Key used to identify the blob.
+ * @throws AuthorizationException
+ */
+ public void hasPermissions(List<AccessControl> acl, int mask, Subject who, String key) throws AuthorizationException {
+ if (!doAclValidation) {
+ return;
+ }
+ Set<String> user = constructUserFromPrincipals(who);
+ LOG.debug("user {}", user);
+ if (checkForValidUsers(who, mask)) {
+ return;
+ }
+ for (AccessControl ac : acl) {
+ int allowed = getAllowed(ac, user);
+ mask = ~allowed & mask;
+ LOG.debug(" user: {} allowed: {} disallowed: {} key: {}", user, allowed, mask, key);
+ }
+ if (mask == 0) {
+ return;
+ }
+ throw new AuthorizationException(
+ user + " does not have " + namedPerms(mask) + " access to " + key);
+ }
+
+ public void normalizeSettableBlobMeta(String key, SettableBlobMeta meta, Subject who, int opMask) {
+ meta.set_acl(normalizeSettableACLs(key, meta.get_acl(), who, opMask));
+ }
+
+ private String namedPerms(int mask) {
+ StringBuilder b = new StringBuilder();
+ b.append("[");
+ if ((mask & READ) > 0) {
+ b.append("READ ");
+ }
+ if ((mask & WRITE) > 0) {
+ b.append("WRITE ");
+ }
+ if ((mask & ADMIN) > 0) {
+ b.append("ADMIN ");
+ }
+ b.append("]");
+ return b.toString();
+ }
+
+ private int getAllowed(AccessControl ac, Set<String> users) {
+ switch (ac.get_type()) {
+ case OTHER:
+ return ac.get_access();
+ case USER:
+ if (users.contains(ac.get_name())) {
+ return ac.get_access();
+ }
+ return 0;
+ default:
+ return 0;
+ }
+ }
+
+ private List<AccessControl> removeBadACLs(List<AccessControl> accessControls) {
+ List<AccessControl> resultAcl = new ArrayList<AccessControl>();
+ for (AccessControl control : accessControls) {
+ if(control.get_type().equals(AccessControlType.OTHER) && (control.get_access() == 0 )) {
+ LOG.debug("Removing invalid blobstore world ACL " +
+ BlobStoreAclHandler.accessControlToString(control));
+ continue;
+ }
+ resultAcl.add(control);
+ }
+ return resultAcl;
+ }
+
+ private final List<AccessControl> normalizeSettableACLs(String key, List<AccessControl> acls, Subject who,
+ int opMask) {
+ List<AccessControl> cleanAcls = removeBadACLs(acls);
+ Set<String> userNames = getUserNamesFromSubject(who);
+ for (String user : userNames) {
+ fixACLsForUser(cleanAcls, user, opMask);
+ }
+ if ((who == null || userNames.isEmpty()) && !worldEverything(acls)) {
+ cleanAcls.addAll(BlobStoreAclHandler.WORLD_EVERYTHING);
+ LOG.debug("Access Control for key {} is normalized to world everything {}", key, cleanAcls);
+ if (!acls.isEmpty())
+ LOG.warn("Access control for blob with key {} is normalized to WORLD_EVERYTHING", key);
+ }
+ return cleanAcls;
+ }
+
+ private boolean worldEverything(List<AccessControl> acls) {
+ boolean isWorldEverything = false;
+ for (AccessControl acl : acls) {
+ if (acl.get_type() == AccessControlType.OTHER && acl.get_access() == (READ|WRITE|ADMIN)) {
+ isWorldEverything = true;
+ break;
+ }
+ }
+ return isWorldEverything;
+ }
+
+ private void fixACLsForUser(List<AccessControl> acls, String user, int mask) {
+ boolean foundUserACL = false;
+ for (AccessControl control : acls) {
+ if (control.get_type() == AccessControlType.USER && control.get_name().equals(user)) {
+ int currentAccess = control.get_access();
+ if ((currentAccess & mask) != mask) {
+ control.set_access(currentAccess | mask);
+ }
+ foundUserACL = true;
+ break;
+ }
+ }
+ if (!foundUserACL) {
+ AccessControl userACL = new AccessControl();
+ userACL.set_type(AccessControlType.USER);
+ userACL.set_name(user);
+ userACL.set_access(mask);
+ acls.add(userACL);
+ }
+ }
+
+ private Set<String> getUserNamesFromSubject(Subject who) {
+ Set<String> user = new HashSet<String>();
+ if (who != null) {
+ for(Principal p: who.getPrincipals()) {
+ user.add(_ptol.toLocal(p));
+ }
+ }
+ return user;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/blobstore/BlobStoreFile.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/blobstore/BlobStoreFile.java b/storm-client/src/jvm/org/apache/storm/blobstore/BlobStoreFile.java
new file mode 100644
index 0000000..9de2f4a
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/blobstore/BlobStoreFile.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.blobstore;
+
+import org.apache.storm.generated.SettableBlobMeta;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.regex.Pattern;
+
+/**
+ * Provides an base implementation for creating a blobstore based on file backed storage.
+ */
+public abstract class BlobStoreFile {
+ public static final Logger LOG = LoggerFactory.getLogger(BlobStoreFile.class);
+
+ protected static final String TMP_EXT = ".tmp";
+ protected static final Pattern TMP_NAME_PATTERN = Pattern.compile("^\\d+\\" + TMP_EXT + "$");
+ protected static final String BLOBSTORE_DATA_FILE = "data";
+
+ public abstract void delete() throws IOException;
+ public abstract String getKey();
+ public abstract boolean isTmp();
+ public abstract void setMetadata(SettableBlobMeta meta);
+ public abstract SettableBlobMeta getMetadata();
+ public abstract long getModTime() throws IOException;
+ public abstract InputStream getInputStream() throws IOException;
+ public abstract OutputStream getOutputStream() throws IOException;
+ public abstract void commit() throws IOException;
+ public abstract void cancel() throws IOException;
+ public abstract long getFileLength() throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/blobstore/ClientBlobStore.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/blobstore/ClientBlobStore.java b/storm-client/src/jvm/org/apache/storm/blobstore/ClientBlobStore.java
new file mode 100644
index 0000000..a1499aa
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/blobstore/ClientBlobStore.java
@@ -0,0 +1,201 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.blobstore;
+
+import org.apache.storm.daemon.Shutdownable;
+import org.apache.storm.generated.AuthorizationException;
+import org.apache.storm.generated.ReadableBlobMeta;
+import org.apache.storm.generated.SettableBlobMeta;
+import org.apache.storm.generated.KeyAlreadyExistsException;
+import org.apache.storm.generated.KeyNotFoundException;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.Utils;
+import org.apache.storm.utils.NimbusClient;
+
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * The ClientBlobStore has two concrete implementations
+ * 1. NimbusBlobStore
+ * 2. HdfsClientBlobStore
+ *
+ * Create, update, read and delete are some of the basic operations defined by this interface.
+ * Each operation is validated for permissions against an user. We currently have NIMBUS_ADMINS and SUPERVISOR_ADMINS
+ * configuration. NIMBUS_ADMINS are given READ, WRITE and ADMIN access whereas the SUPERVISOR_ADMINS are given READ
+ * access in order to read and download the blobs form the nimbus.
+ *
+ * The ACLs for the blob store are validated against whether the subject is a NIMBUS_ADMIN, SUPERVISOR_ADMIN or USER
+ * who has read, write or admin privileges in order to perform respective operations on the blob.
+ *
+ * For more detailed implementation
+ * @see org.apache.storm.blobstore.NimbusBlobStore
+ * @see org.apache.storm.blobstore.LocalFsBlobStore
+ * @see org.apache.storm.hdfs.blobstore.HdfsClientBlobStore
+ * @see org.apache.storm.hdfs.blobstore.HdfsBlobStore
+ */
+public abstract class ClientBlobStore implements Shutdownable {
+ protected Map conf;
+
+ public interface WithBlobstore {
+ void run(ClientBlobStore blobStore) throws Exception;
+ }
+
+ public static void withConfiguredClient(WithBlobstore withBlobstore) throws Exception {
+ Map<String, Object> conf = ConfigUtils.readStormConfig();
+ ClientBlobStore blobStore = Utils.getClientBlobStore(conf);
+
+ try {
+ withBlobstore.run(blobStore);
+ } finally {
+ blobStore.shutdown();
+ }
+ }
+
+ /**
+ * Sets up the client API by parsing the configs.
+ * @param conf The storm conf containing the config details.
+ */
+ public abstract void prepare(Map conf);
+
+ /**
+ * Client facing API to create a blob.
+ * @param key blob key name.
+ * @param meta contains ACL information.
+ * @return AtomicOutputStream returns an output stream into which data can be written.
+ * @throws AuthorizationException
+ * @throws KeyAlreadyExistsException
+ */
+ protected abstract AtomicOutputStream createBlobToExtend(String key, SettableBlobMeta meta) throws AuthorizationException, KeyAlreadyExistsException;
+
+ /**
+ * Client facing API to update a blob.
+ * @param key blob key name.
+ * @return AtomicOutputStream returns an output stream into which data can be written.
+ * @throws AuthorizationException
+ * @throws KeyNotFoundException
+ */
+ public abstract AtomicOutputStream updateBlob(String key) throws AuthorizationException, KeyNotFoundException;
+
+ /**
+ * Client facing API to read the metadata information.
+ * @param key blob key name.
+ * @return AtomicOutputStream returns an output stream into which data can be written.
+ * @throws AuthorizationException
+ * @throws KeyNotFoundException
+ */
+ public abstract ReadableBlobMeta getBlobMeta(String key) throws AuthorizationException, KeyNotFoundException;
+
+ /**
+ * Client facing API to set the metadata for a blob.
+ * @param key blob key name.
+ * @param meta contains ACL information.
+ * @throws AuthorizationException
+ * @throws KeyNotFoundException
+ */
+ protected abstract void setBlobMetaToExtend(String key, SettableBlobMeta meta) throws AuthorizationException, KeyNotFoundException;
+
+ /**
+ * Client facing API to delete a blob.
+ * @param key blob key name.
+ * @throws AuthorizationException
+ * @throws KeyNotFoundException
+ */
+ public abstract void deleteBlob(String key) throws AuthorizationException, KeyNotFoundException;
+
+ /**
+ * Client facing API to read a blob.
+ * @param key blob key name.
+ * @return an InputStream to read the metadata for a blob.
+ * @throws AuthorizationException
+ * @throws KeyNotFoundException
+ */
+ public abstract InputStreamWithMeta getBlob(String key) throws AuthorizationException, KeyNotFoundException;
+
+ /**
+ * @return Iterator for a list of keys currently present in the blob store.
+ */
+ public abstract Iterator<String> listKeys();
+
+ /**
+ * Client facing API to read the replication of a blob.
+ * @param key blob key name.
+ * @return int indicates the replication factor of a blob.
+ * @throws AuthorizationException
+ * @throws KeyNotFoundException
+ */
+ public abstract int getBlobReplication(String key) throws AuthorizationException, KeyNotFoundException;
+
+ /**
+ * Client facing API to update the replication of a blob.
+ * @param key blob key name.
+ * @param replication int indicates the replication factor a blob has to be set.
+ * @return int indicates the replication factor of a blob.
+ * @throws AuthorizationException
+ * @throws KeyNotFoundException
+ */
+ public abstract int updateBlobReplication(String key, int replication) throws AuthorizationException, KeyNotFoundException;
+
+ /**
+ * Client facing API to set a nimbus client.
+ * @param conf storm conf
+ * @param client NimbusClient
+ * @return indicates where the client connection has been setup.
+ */
+ public abstract boolean setClient(Map conf, NimbusClient client);
+
+ /**
+ * Creates state inside a zookeeper.
+ * Required for blobstore to write to zookeeper
+ * when Nimbus HA is turned on in order to maintain
+ * state consistency
+ * @param key
+ */
+ public abstract void createStateInZookeeper(String key);
+
+ /**
+ * Client facing API to create a blob.
+ * @param key blob key name.
+ * @param meta contains ACL information.
+ * @return AtomicOutputStream returns an output stream into which data can be written.
+ * @throws AuthorizationException
+ * @throws KeyAlreadyExistsException
+ */
+ public final AtomicOutputStream createBlob(String key, SettableBlobMeta meta) throws AuthorizationException, KeyAlreadyExistsException {
+ if (meta !=null && meta.is_set_acl()) {
+ BlobStoreAclHandler.validateSettableACLs(key, meta.get_acl());
+ }
+ return createBlobToExtend(key, meta);
+ }
+
+ /**
+ * Client facing API to set the metadata for a blob.
+ * @param key blob key name.
+ * @param meta contains ACL information.
+ * @throws AuthorizationException
+ * @throws KeyNotFoundException
+ */
+ public final void setBlobMeta(String key, SettableBlobMeta meta) throws AuthorizationException, KeyNotFoundException {
+ if (meta !=null && meta.is_set_acl()) {
+ BlobStoreAclHandler.validateSettableACLs(key, meta.get_acl());
+ }
+ setBlobMetaToExtend(key, meta);
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/blobstore/InputStreamWithMeta.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/blobstore/InputStreamWithMeta.java b/storm-client/src/jvm/org/apache/storm/blobstore/InputStreamWithMeta.java
new file mode 100644
index 0000000..6600a00
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/blobstore/InputStreamWithMeta.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.blobstore;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+public abstract class InputStreamWithMeta extends InputStream {
+ public abstract long getVersion() throws IOException;
+ public abstract long getFileLength() throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/blobstore/KeyFilter.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/blobstore/KeyFilter.java b/storm-client/src/jvm/org/apache/storm/blobstore/KeyFilter.java
new file mode 100644
index 0000000..c2d69e1
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/blobstore/KeyFilter.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.blobstore;
+
+public interface KeyFilter<R> {
+ R filter(String key);
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/blobstore/NimbusBlobStore.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/blobstore/NimbusBlobStore.java b/storm-client/src/jvm/org/apache/storm/blobstore/NimbusBlobStore.java
new file mode 100644
index 0000000..5b7713d
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/blobstore/NimbusBlobStore.java
@@ -0,0 +1,428 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.blobstore;
+
+import org.apache.storm.Config;
+import org.apache.storm.generated.AuthorizationException;
+import org.apache.storm.generated.BeginDownloadResult;
+import org.apache.storm.generated.ListBlobsResult;
+import org.apache.storm.generated.ReadableBlobMeta;
+import org.apache.storm.generated.SettableBlobMeta;
+import org.apache.storm.generated.KeyAlreadyExistsException;
+import org.apache.storm.generated.KeyNotFoundException;
+import org.apache.storm.utils.ObjectReader;
+import org.apache.storm.utils.NimbusClient;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NoSuchElementException;
+
+/**
+ * NimbusBlobStore is a USER facing client API to perform
+ * basic operations such as create, update, delete and read
+ * for local and hdfs blob store.
+ *
+ * For local blob store it is also the client facing API for
+ * supervisor in order to download blobs from nimbus.
+ */
+public class NimbusBlobStore extends ClientBlobStore implements AutoCloseable {
+ private static final Logger LOG = LoggerFactory.getLogger(NimbusBlobStore.class);
+
+ public class NimbusKeyIterator implements Iterator<String> {
+ private ListBlobsResult listBlobs = null;
+ private int offset = 0;
+ private boolean eof = false;
+
+ public NimbusKeyIterator(ListBlobsResult listBlobs) {
+ this.listBlobs = listBlobs;
+ this.eof = (listBlobs.get_keys_size() == 0);
+ }
+
+ private boolean isCacheEmpty() {
+ return listBlobs.get_keys_size() <= offset;
+ }
+
+ private void readMore() throws TException {
+ if (!eof) {
+ offset = 0;
+ synchronized(client) {
+ listBlobs = client.getClient().listBlobs(listBlobs.get_session());
+ }
+ if (listBlobs.get_keys_size() == 0) {
+ eof = true;
+ }
+ }
+ }
+
+ @Override
+ public synchronized boolean hasNext() {
+ try {
+ if (isCacheEmpty()) {
+ readMore();
+ }
+ } catch (TException e) {
+ throw new RuntimeException(e);
+ }
+ return !eof;
+ }
+
+ @Override
+ public synchronized String next() {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+ String ret = listBlobs.get_keys().get(offset);
+ offset++;
+ return ret;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException("Delete Not Supported");
+ }
+ }
+
+ public class NimbusDownloadInputStream extends InputStreamWithMeta {
+ private BeginDownloadResult beginBlobDownload;
+ private byte[] buffer = null;
+ private int offset = 0;
+ private int end = 0;
+ private boolean eof = false;
+
+ public NimbusDownloadInputStream(BeginDownloadResult beginBlobDownload) {
+ this.beginBlobDownload = beginBlobDownload;
+ }
+
+ @Override
+ public long getVersion() throws IOException {
+ return beginBlobDownload.get_version();
+ }
+
+ @Override
+ public synchronized int read() throws IOException {
+ try {
+ if (isEmpty()) {
+ readMore();
+ if (eof) {
+ return -1;
+ }
+ }
+ int length = Math.min(1, available());
+ if (length == 0) {
+ return -1;
+ }
+ int ret = buffer[offset];
+ offset += length;
+ return ret;
+ } catch(TException exp) {
+ throw new IOException(exp);
+ }
+ }
+
+ @Override
+ public synchronized int read(byte[] b, int off, int len) throws IOException {
+ try {
+ if (isEmpty()) {
+ readMore();
+ if (eof) {
+ return -1;
+ }
+ }
+ int length = Math.min(len, available());
+ System.arraycopy(buffer, offset, b, off, length);
+ offset += length;
+ return length;
+ } catch(TException exp) {
+ throw new IOException(exp);
+ }
+ }
+
+ private boolean isEmpty() {
+ return buffer == null || offset >= end;
+ }
+
+ private void readMore() throws TException {
+ if (!eof) {
+ ByteBuffer buff;
+ synchronized(client) {
+ buff = client.getClient().downloadBlobChunk(beginBlobDownload.get_session());
+ }
+ buffer = buff.array();
+ offset = buff.arrayOffset() + buff.position();
+ int length = buff.remaining();
+ end = offset + length;
+ if (length == 0) {
+ eof = true;
+ }
+ }
+ }
+
+ @Override
+ public synchronized int read(byte[] b) throws IOException {
+ return read(b, 0, b.length);
+ }
+
+ @Override
+ public synchronized int available() {
+ return buffer == null ? 0 : (end - offset);
+ }
+
+ @Override
+ public long getFileLength() {
+ return beginBlobDownload.get_data_size();
+ }
+ }
+
+ public class NimbusUploadAtomicOutputStream extends AtomicOutputStream {
+ private String session;
+ private int maxChunkSize = 4096;
+ private String key;
+
+ public NimbusUploadAtomicOutputStream(String session, int bufferSize, String key) {
+ this.session = session;
+ this.maxChunkSize = bufferSize;
+ this.key = key;
+ }
+
+ @Override
+ public void cancel() throws IOException {
+ try {
+ synchronized(client) {
+ client.getClient().cancelBlobUpload(session);
+ }
+ } catch (TException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ try {
+ synchronized(client) {
+ client.getClient().uploadBlobChunk(session, ByteBuffer.wrap(new byte[] {(byte)b}));
+ }
+ } catch (TException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void write(byte []b) throws IOException {
+ write(b, 0, b.length);
+ }
+
+ @Override
+ public void write(byte []b, int offset, int len) throws IOException {
+ try {
+ int end = offset + len;
+ for (int realOffset = offset; realOffset < end; realOffset += maxChunkSize) {
+ int realLen = Math.min(end - realOffset, maxChunkSize);
+ LOG.debug("Writing {} bytes of {} remaining",realLen,(end-realOffset));
+ synchronized(client) {
+ client.getClient().uploadBlobChunk(session, ByteBuffer.wrap(b, realOffset, realLen));
+ }
+ }
+ } catch (TException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ try {
+ synchronized(client) {
+ client.getClient().finishBlobUpload(session);
+ client.getClient().createStateInZookeeper(key);
+ }
+ } catch (TException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ private NimbusClient client;
+ private int bufferSize = 4096;
+
+ @Override
+ public void prepare(Map conf) {
+ this.client = NimbusClient.getConfiguredClient(conf);
+ if (conf != null) {
+ this.bufferSize = ObjectReader.getInt(conf.get(Config.STORM_BLOBSTORE_INPUTSTREAM_BUFFER_SIZE_BYTES), bufferSize);
+ }
+ }
+
+ @Override
+ protected AtomicOutputStream createBlobToExtend(String key, SettableBlobMeta meta)
+ throws AuthorizationException, KeyAlreadyExistsException {
+ try {
+ synchronized(client) {
+ return new NimbusUploadAtomicOutputStream(client.getClient().beginCreateBlob(key, meta), this.bufferSize, key);
+ }
+ } catch (AuthorizationException | KeyAlreadyExistsException exp) {
+ throw exp;
+ } catch (TException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public AtomicOutputStream updateBlob(String key)
+ throws AuthorizationException, KeyNotFoundException {
+ try {
+ synchronized(client) {
+ return new NimbusUploadAtomicOutputStream(client.getClient().beginUpdateBlob(key), this.bufferSize, key);
+ }
+ } catch (AuthorizationException | KeyNotFoundException exp) {
+ throw exp;
+ } catch (TException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public ReadableBlobMeta getBlobMeta(String key) throws AuthorizationException, KeyNotFoundException {
+ try {
+ synchronized(client) {
+ return client.getClient().getBlobMeta(key);
+ }
+ } catch (AuthorizationException | KeyNotFoundException exp) {
+ throw exp;
+ } catch (TException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ protected void setBlobMetaToExtend(String key, SettableBlobMeta meta)
+ throws AuthorizationException, KeyNotFoundException {
+ try {
+ synchronized(client) {
+ client.getClient().setBlobMeta(key, meta);
+ }
+ } catch (AuthorizationException | KeyNotFoundException exp) {
+ throw exp;
+ } catch (TException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void deleteBlob(String key) throws AuthorizationException, KeyNotFoundException {
+ try {
+ synchronized(client) {
+ client.getClient().deleteBlob(key);
+ }
+ } catch (AuthorizationException | KeyNotFoundException exp) {
+ throw exp;
+ } catch (TException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void createStateInZookeeper(String key) {
+ try {
+ synchronized(client) {
+ client.getClient().createStateInZookeeper(key);
+ }
+ } catch (TException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public InputStreamWithMeta getBlob(String key) throws AuthorizationException, KeyNotFoundException {
+ try {
+ synchronized(client) {
+ return new NimbusDownloadInputStream(client.getClient().beginBlobDownload(key));
+ }
+ } catch (AuthorizationException | KeyNotFoundException exp) {
+ throw exp;
+ } catch (TException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public Iterator<String> listKeys() {
+ try {
+ synchronized(client) {
+ return new NimbusKeyIterator(client.getClient().listBlobs(""));
+ }
+ } catch (TException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public int getBlobReplication(String key) throws AuthorizationException, KeyNotFoundException {
+ try {
+ return client.getClient().getBlobReplication(key);
+ } catch (AuthorizationException | KeyNotFoundException exp) {
+ throw exp;
+ } catch (TException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public int updateBlobReplication(String key, int replication) throws AuthorizationException, KeyNotFoundException {
+ try {
+ return client.getClient().updateBlobReplication(key, replication);
+ } catch (AuthorizationException | KeyNotFoundException exp) {
+ throw exp;
+ } catch (TException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public boolean setClient(Map conf, NimbusClient client) {
+ if (this.client != null) {
+ this.client.close();
+ }
+ this.client = client;
+ if (conf != null) {
+ this.bufferSize = ObjectReader.getInt(conf.get(Config.STORM_BLOBSTORE_INPUTSTREAM_BUFFER_SIZE_BYTES), bufferSize);
+ }
+ return true;
+ }
+
+ @Override
+ protected void finalize() {
+ shutdown();
+ }
+
+ @Override
+ public void shutdown() {
+ if (client != null) {
+ client.close();
+ client = null;
+ }
+ }
+
+ @Override
+ public void close() {
+ shutdown();
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/bolt/JoinBolt.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/bolt/JoinBolt.java b/storm-client/src/jvm/org/apache/storm/bolt/JoinBolt.java
new file mode 100644
index 0000000..007a958
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/bolt/JoinBolt.java
@@ -0,0 +1,595 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.bolt;
+
+
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseWindowedBolt;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.windowing.TimestampExtractor;
+import org.apache.storm.windowing.TupleWindow;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+
+public class JoinBolt extends BaseWindowedBolt {
+
+ private OutputCollector collector;
+
+ // Map[StreamName -> Map[Key -> List<Tuple>] ]
+ HashMap<String, HashMap<Object, ArrayList<Tuple> >> hashedInputs = new HashMap<>(); // holds remaining streams
+
+ // Map[StreamName -> JoinInfo]
+ protected LinkedHashMap<String, JoinInfo> joinCriteria = new LinkedHashMap<>();
+ protected FieldSelector[] outputFields; // specified via bolt.select() ... used in declaring Output fields
+// protected String[] dotSeparatedOutputFieldNames; // fieldNames in x.y.z format w/o stream name, used for naming output fields
+ protected String outputStreamName;
+
+ // Use streamId, source component name OR field in tuple to distinguish incoming tuple streams
+ public enum Selector { STREAM, SOURCE }
+ protected final Selector selectorType;
+
+
+ /**
+ * Calls JoinBolt(Selector.SOURCE, sourceId, fieldName)
+ * @param sourceId Id of source component (spout/bolt) from which this bolt is receiving data
+ * @param fieldName the field to use for joining the stream (x.y.z format)
+ */
+ public JoinBolt(String sourceId, String fieldName) {
+ this(Selector.SOURCE, sourceId, fieldName);
+ }
+ /**
+ *
+ * Introduces the first stream to start the join with. Equivalent SQL ...
+ * select .... from srcOrStreamId ...
+ * @param type Specifies whether 'srcOrStreamId' refers to stream name/source component
+ * @param srcOrStreamId name of stream OR source component
+ * @param fieldName the field to use for joining the stream (x.y.z format)
+ */
+ public JoinBolt(Selector type, String srcOrStreamId, String fieldName) {
+ selectorType = type;
+
+ joinCriteria.put(srcOrStreamId, new JoinInfo( new FieldSelector( srcOrStreamId, fieldName) ) );
+ }
+
+ /**
+ * Optional. Allows naming the output stream of this bolt. If not specified, the emits will happen on
+ * 'default' stream.
+ */
+ public JoinBolt withOutputStream(String streamName) {
+ this.outputStreamName = streamName;
+ return this;
+ }
+
+ /**
+ * Performs inner Join with the newStream.
+ * SQL : from priorStream inner join newStream on newStream.field = priorStream.field1
+ * same as: new WindowedQueryBolt(priorStream,field1). join(newStream, field, priorStream);
+ *
+ * Note: priorStream must be previously joined.
+ * Valid ex: new WindowedQueryBolt(s1,k1). join(s2,k2, s1). join(s3,k3, s2);
+ * Invalid ex: new WindowedQueryBolt(s1,k1). join(s3,k3, s2). join(s2,k2, s1);
+ *
+ * @param newStream Either stream name or name of upstream component
+ * @param field the field on which to perform the join
+ */
+ public JoinBolt join(String newStream, String field, String priorStream) {
+ return joinCommon(newStream, field, priorStream, JoinType.INNER);
+ }
+
+ /**
+ * Performs left Join with the newStream.
+ * SQL : from stream1 left join stream2 on stream2.field = stream1.field1
+ * same as: new WindowedQueryBolt(stream1, field1). leftJoin(stream2, field, stream1);
+ *
+ * Note: priorStream must be previously joined
+ * Valid ex: new WindowedQueryBolt(s1,k1). leftJoin(s2,k2, s1). leftJoin(s3,k3, s2);
+ * Invalid ex: new WindowedQueryBolt(s1,k1). leftJoin(s3,k3, s2). leftJoin(s2,k2, s1);
+ *
+ * @param newStream Either a name of a stream or an upstream component
+ * @param field the field on which to perform the join
+ */
+ public JoinBolt leftJoin(String newStream, String field, String priorStream) {
+ return joinCommon(newStream, field, priorStream, JoinType.LEFT);
+ }
+
+ private JoinBolt joinCommon(String newStream, String fieldDescriptor, String priorStream, JoinType joinType) {
+ if (hashedInputs.containsKey(newStream)) {
+ throw new IllegalArgumentException("'" + newStream + "' is already part of join. Cannot join with it more than once.");
+ }
+ hashedInputs.put(newStream, new HashMap<Object, ArrayList<Tuple>>());
+ JoinInfo joinInfo = joinCriteria.get(priorStream);
+ if( joinInfo==null )
+ throw new IllegalArgumentException("Stream '" + priorStream + "' was not previously declared");
+
+ FieldSelector field = new FieldSelector(newStream, fieldDescriptor);
+ joinCriteria.put(newStream, new JoinInfo(field, priorStream, joinInfo, joinType) );
+ return this;
+ }
+
+ /**
+ * Specify projection fields. i.e. Specifies the fields to include in the output.
+ * e.g: .select("field1, stream2:field2, field3")
+ * Nested Key names are supported for nested types:
+ * e.g: .select("outerKey1.innerKey1, outerKey1.innerKey2, stream3:outerKey2.innerKey3)"
+ * Inner types (non leaf) must be Map<> in order to support nested lookup using this dot notation
+ * This selected fields implicitly declare the output fieldNames for the bolt based.
+ * @param commaSeparatedKeys
+ * @return
+ */
+ public JoinBolt select(String commaSeparatedKeys) {
+ String[] fieldNames = commaSeparatedKeys.split(",");
+
+ outputFields = new FieldSelector[fieldNames.length];
+ for (int i = 0; i < fieldNames.length; i++) {
+ outputFields[i] = new FieldSelector(fieldNames[i]);
+ }
+ return this;
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ String[] outputFieldNames = new String[outputFields.length];
+ for( int i=0; i<outputFields.length; ++i ) {
+ outputFieldNames[i] = outputFields[i].getOutputName() ;
+ }
+ if (outputStreamName!=null) {
+ declarer.declareStream(outputStreamName, new Fields(outputFieldNames));
+ } else {
+ declarer.declare(new Fields(outputFieldNames));
+ }
+ }
+
+ @Override
+ public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+ this.collector = collector;
+ // initialize the hashedInputs data structure
+ int i=0;
+ for ( String stream : joinCriteria.keySet() ) {
+ if(i>0) {
+ hashedInputs.put(stream, new HashMap<Object, ArrayList<Tuple>>());
+ }
+ ++i;
+ }
+ if(outputFields ==null) {
+ throw new IllegalArgumentException("Must specify output fields via .select() method.");
+ }
+ }
+
+ @Override
+ public void execute(TupleWindow inputWindow) {
+ // 1) Perform Join
+ List<Tuple> currentWindow = inputWindow.get();
+ JoinAccumulator joinResult = hashJoin(currentWindow);
+
+ // 2) Emit results
+ for (ResultRecord resultRecord : joinResult.getRecords()) {
+ ArrayList<Object> outputTuple = resultRecord.getOutputFields();
+ if ( outputStreamName==null )
+ // explicit anchoring emits to corresponding input tuples only, as default window anchoring will anchor them to all tuples in window
+ collector.emit( resultRecord.tupleList, outputTuple );
+ else
+ // explicitly anchor emits to corresponding input tuples only, as default window anchoring will anchor them to all tuples in window
+ collector.emit( outputStreamName, resultRecord.tupleList, outputTuple );
+ }
+ }
+
+ private void clearHashedInputs() {
+ for (HashMap<Object, ArrayList<Tuple>> mappings : hashedInputs.values()) {
+ mappings.clear();
+ }
+ }
+
+ protected JoinAccumulator hashJoin(List<Tuple> tuples) {
+ clearHashedInputs();
+
+ JoinAccumulator probe = new JoinAccumulator();
+
+ // 1) Build phase - Segregate tuples in the Window into streams.
+ // First stream's tuples go into probe, rest into HashMaps in hashedInputs
+ String firstStream = joinCriteria.keySet().iterator().next();
+ for (Tuple tuple : tuples) {
+ String streamId = getStreamSelector(tuple);
+ if ( ! streamId.equals(firstStream) ) {
+ Object field = getJoinField(streamId, tuple);
+ ArrayList<Tuple> recs = hashedInputs.get(streamId).get(field);
+ if(recs == null) {
+ recs = new ArrayList<Tuple>();
+ hashedInputs.get(streamId).put(field, recs);
+ }
+ recs.add(tuple);
+
+ } else {
+ ResultRecord probeRecord = new ResultRecord(tuple, joinCriteria.size() == 1);
+ probe.insert( probeRecord ); // first stream's data goes into the probe
+ }
+ }
+
+ // 2) Join the streams in order of streamJoinOrder
+ int i=0;
+ for (String streamName : joinCriteria.keySet() ) {
+ boolean finalJoin = (i==joinCriteria.size()-1);
+ if(i>0) {
+ probe = doJoin(probe, hashedInputs.get(streamName), joinCriteria.get(streamName), finalJoin);
+ }
+ ++i;
+ }
+
+
+ return probe;
+ }
+
+ // Dispatches to the right join method (inner/left/right/outer) based on the joinInfo.joinType
+ protected JoinAccumulator doJoin(JoinAccumulator probe, HashMap<Object, ArrayList<Tuple>> buildInput, JoinInfo joinInfo, boolean finalJoin) {
+ final JoinType joinType = joinInfo.getJoinType();
+ switch ( joinType ) {
+ case INNER:
+ return doInnerJoin(probe, buildInput, joinInfo, finalJoin);
+ case LEFT:
+ return doLeftJoin(probe, buildInput, joinInfo, finalJoin);
+ case RIGHT:
+ case OUTER:
+ default:
+ throw new RuntimeException("Unsupported join type : " + joinType.name() );
+ }
+ }
+
+ // inner join - core implementation
+ protected JoinAccumulator doInnerJoin(JoinAccumulator probe, Map<Object, ArrayList<Tuple>> buildInput, JoinInfo joinInfo, boolean finalJoin) {
+ String[] probeKeyName = joinInfo.getOtherField();
+ JoinAccumulator result = new JoinAccumulator();
+ FieldSelector fieldSelector = new FieldSelector(joinInfo.other.getStreamName(), probeKeyName);
+ for (ResultRecord rec : probe.getRecords()) {
+ Object probeKey = rec.getField(fieldSelector);
+ if (probeKey!=null) {
+ ArrayList<Tuple> matchingBuildRecs = buildInput.get(probeKey);
+ if(matchingBuildRecs!=null) {
+ for (Tuple matchingRec : matchingBuildRecs) {
+ ResultRecord mergedRecord = new ResultRecord(rec, matchingRec, finalJoin);
+ result.insert(mergedRecord);
+ }
+ }
+ }
+ }
+ return result;
+ }
+
+ // left join - core implementation
+ protected JoinAccumulator doLeftJoin(JoinAccumulator probe, Map<Object, ArrayList<Tuple>> buildInput, JoinInfo joinInfo, boolean finalJoin) {
+ String[] probeKeyName = joinInfo.getOtherField();
+ JoinAccumulator result = new JoinAccumulator();
+ FieldSelector fieldSelector = new FieldSelector(joinInfo.other.getStreamName(), probeKeyName);
+ for (ResultRecord rec : probe.getRecords()) {
+ Object probeKey = rec.getField(fieldSelector);
+ if (probeKey!=null) {
+ ArrayList<Tuple> matchingBuildRecs = buildInput.get(probeKey); // ok if its return null
+ if (matchingBuildRecs!=null && !matchingBuildRecs.isEmpty() ) {
+ for (Tuple matchingRec : matchingBuildRecs) {
+ ResultRecord mergedRecord = new ResultRecord(rec, matchingRec, finalJoin);
+ result.insert(mergedRecord);
+ }
+ } else {
+ ResultRecord mergedRecord = new ResultRecord(rec, null, finalJoin);
+ result.insert(mergedRecord);
+ }
+
+ }
+ }
+ return result;
+ }
+
+
+ // Identify the join field for the stream, and look it up in 'tuple'. field can be nested field: outerKey.innerKey
+ private Object getJoinField(String streamId, Tuple tuple) {
+ JoinInfo ji = joinCriteria.get(streamId);
+ if(ji==null) {
+ throw new RuntimeException("Join information for '" + streamId + "' not found. Check the join clauses.");
+ }
+ return lookupField(ji.getJoinField(), tuple);
+ }
+
+ // Returns either the source component name or the stream name for the tuple
+ private String getStreamSelector(Tuple ti) {
+ switch (selectorType) {
+ case STREAM:
+ return ti.getSourceStreamId();
+ case SOURCE:
+ return ti.getSourceComponent();
+ default:
+ throw new RuntimeException(selectorType + " stream selector type not yet supported");
+ }
+ }
+
+
+ protected enum JoinType {INNER, LEFT, RIGHT, OUTER}
+
+ /** Describes how to join the other stream with the current stream */
+ protected static class JoinInfo implements Serializable {
+ final static long serialVersionUID = 1L;
+
+ private JoinType joinType; // nature of join
+ private FieldSelector field; // field for the current stream
+ private FieldSelector other; // field for the other (2nd) stream
+
+
+ public JoinInfo(FieldSelector field) {
+ this.joinType = null;
+ this.field = field;
+ this.other = null;
+ }
+ public JoinInfo(FieldSelector field, String otherStream, JoinInfo otherStreamJoinInfo, JoinType joinType) {
+ this.joinType = joinType;
+ this.field = field;
+ this.other = new FieldSelector(otherStream, otherStreamJoinInfo.field.getOutputName() );
+ }
+
+ public FieldSelector getJoinField() {
+ return field;
+ }
+
+ public String getOtherStream() {
+ return other.getStreamName();
+ }
+
+ public String[] getOtherField() {
+ return other.getField();
+ }
+
+ public JoinType getJoinType() {
+ return joinType;
+ }
+
+ } // class JoinInfo
+
+ // Join helper to concat fields to the record
+ protected class ResultRecord {
+
+ ArrayList<Tuple> tupleList = new ArrayList<>(); // contains one Tuple per Stream being joined
+ ArrayList<Object> outFields = null; // refs to fields that will be part of output fields
+
+ // 'generateOutputFields' enables us to avoid projection unless it is the final stream being joined
+ public ResultRecord(Tuple tuple, boolean generateOutputFields) {
+ tupleList.add(tuple);
+ if(generateOutputFields) {
+ outFields = doProjection(tupleList, outputFields);
+ }
+ }
+
+ public ResultRecord(ResultRecord lhs, Tuple rhs, boolean generateOutputFields) {
+ if(lhs!=null)
+ tupleList.addAll(lhs.tupleList);
+ if(rhs!=null)
+ tupleList.add(rhs);
+ if(generateOutputFields) {
+ outFields = doProjection(tupleList, outputFields);
+ }
+ }
+
+ public ArrayList<Object> getOutputFields() {
+ return outFields;
+ }
+
+
+ // 'stream' cannot be null,
+ public Object getField(FieldSelector fieldSelector) {
+ for (Tuple tuple : tupleList) {
+ Object result = lookupField(fieldSelector, tuple);
+ if (result!=null)
+ return result;
+ }
+ return null;
+ }
+ }
+
+ protected class JoinAccumulator {
+ ArrayList<ResultRecord> records = new ArrayList<>();
+
+ public void insert(ResultRecord tuple) {
+ records.add( tuple );
+ }
+
+ public Collection<ResultRecord> getRecords() {
+ return records;
+ }
+ }
+
+ // Performs projection on the tuples based on 'projectionFields'
+ protected ArrayList<Object> doProjection(ArrayList<Tuple> tuples, FieldSelector[] projectionFields) {
+ ArrayList<Object> result = new ArrayList<>(projectionFields.length);
+ // Todo: optimize this computation... perhaps inner loop can be outside to avoid rescanning tuples
+ for ( int i = 0; i < projectionFields.length; i++ ) {
+ boolean missingField = true;
+ for ( Tuple tuple : tuples ) {
+ Object field = lookupField(projectionFields[i], tuple ) ;
+ if (field != null) {
+ result.add(field);
+ missingField=false;
+ break;
+ }
+ }
+ if(missingField) { // add a null for missing fields (usually in case of outer joins)
+ result.add(null);
+ }
+ }
+ return result;
+ }
+
+ protected static class FieldSelector implements Serializable {
+ String streamName; // can be null;
+ String[] field; // nested field "x.y.z" becomes => String["x","y","z"]
+ String outputName; // either "stream1:x.y.z" or "x.y.z" depending on whether stream name is present.
+
+ public FieldSelector(String fieldDescriptor) { // sample fieldDescriptor = "stream1:x.y.z"
+ int pos = fieldDescriptor.indexOf(':');
+
+ if (pos>0) { // stream name is specified
+ streamName = fieldDescriptor.substring(0,pos).trim();
+ outputName = fieldDescriptor.trim();
+ field = fieldDescriptor.substring(pos+1, fieldDescriptor.length()).split("\\.");
+ return;
+ }
+
+ // stream name unspecified
+ streamName = null;
+ if(pos==0) {
+ outputName = fieldDescriptor.substring(1, fieldDescriptor.length() ).trim();
+
+ } else if (pos<0) {
+ outputName = fieldDescriptor.trim();
+ }
+ field = outputName.split("\\.");
+ }
+
+ /**
+ * @param stream name of stream
+ * @param fieldDescriptor Simple fieldDescriptor like "x.y.z" and w/o a 'stream1:' stream qualifier.
+ */
+ public FieldSelector(String stream, String fieldDescriptor) {
+ this(fieldDescriptor);
+ if(fieldDescriptor.indexOf(":")>=0) {
+ throw new IllegalArgumentException("Not expecting stream qualifier ':' in '" + fieldDescriptor
+ + "'. Stream name '" + stream + "' is implicit in this context");
+ }
+ this.streamName = stream;
+ }
+
+ public FieldSelector(String stream, String[] field) {
+ this( stream, String.join(".", field) );
+ }
+
+
+ public String getStreamName() {
+ return streamName;
+ }
+
+ public String[] getField() {
+ return field;
+ }
+
+ public String getOutputName() {
+ return toString();
+ }
+
+ @Override
+ public String toString() {
+ return outputName;
+ }
+ }
+
+ // Extract the field from tuple. Field may be nested field (x.y.z)
+ protected Object lookupField(FieldSelector fieldSelector, Tuple tuple) {
+
+ // very stream name matches, it stream name was specified
+ if ( fieldSelector.streamName!=null &&
+ !fieldSelector.streamName.equalsIgnoreCase( getStreamSelector(tuple) ) ) {
+ return null;
+ }
+
+ Object curr = null;
+ for (int i=0; i < fieldSelector.field.length; i++) {
+ if (i==0) {
+ if (tuple.contains(fieldSelector.field[i]) )
+ curr = tuple.getValueByField(fieldSelector.field[i]);
+ else
+ return null;
+ } else {
+ curr = ((Map) curr).get(fieldSelector.field[i]);
+ if (curr==null)
+ return null;
+ }
+ }
+ return curr;
+ }
+
+ // Boilerplate overrides to cast result from base type to JoinBolt, so user doesn't have to
+ // down cast when invoking these methods
+
+ @Override
+ public JoinBolt withWindow(Count windowLength, Count slidingInterval) {
+ return (JoinBolt) super.withWindow(windowLength, slidingInterval);
+ }
+
+ @Override
+ public JoinBolt withWindow(Count windowLength, Duration slidingInterval) {
+ return (JoinBolt) super.withWindow(windowLength, slidingInterval);
+ }
+
+ @Override
+ public JoinBolt withWindow(Duration windowLength, Count slidingInterval) {
+ return (JoinBolt) super.withWindow(windowLength, slidingInterval);
+ }
+
+ @Override
+ public JoinBolt withWindow(Duration windowLength, Duration slidingInterval) {
+ return (JoinBolt) super.withWindow(windowLength, slidingInterval);
+ }
+
+ @Override
+ public JoinBolt withWindow(Count windowLength) {
+ return (JoinBolt) super.withWindow(windowLength);
+ }
+
+ @Override
+ public JoinBolt withWindow(Duration windowLength) {
+ return (JoinBolt) super.withWindow(windowLength);
+ }
+
+ @Override
+ public JoinBolt withTumblingWindow(Count count) {
+ return (JoinBolt) super.withTumblingWindow(count);
+ }
+
+ @Override
+ public JoinBolt withTumblingWindow(Duration duration) {
+ return (JoinBolt) super.withTumblingWindow(duration);
+ }
+
+ @Override
+ public JoinBolt withTimestampField(String fieldName) {
+ return (JoinBolt) super.withTimestampField(fieldName);
+ }
+
+ @Override
+ public JoinBolt withTimestampExtractor(TimestampExtractor timestampExtractor) {
+ return (JoinBolt) super.withTimestampExtractor(timestampExtractor);
+ }
+
+ @Override
+ public JoinBolt withLateTupleStream(String streamId) {
+ return (JoinBolt) super.withLateTupleStream(streamId);
+ }
+
+ @Override
+ public BaseWindowedBolt withLag(Duration duration) {
+ return (JoinBolt) super.withLag(duration);
+ }
+
+ @Override
+ public BaseWindowedBolt withWatermarkInterval(Duration interval) {
+ return (JoinBolt) super.withWatermarkInterval(interval);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/callback/DefaultWatcherCallBack.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/callback/DefaultWatcherCallBack.java b/storm-client/src/jvm/org/apache/storm/callback/DefaultWatcherCallBack.java
new file mode 100644
index 0000000..043dd0c
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/callback/DefaultWatcherCallBack.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.callback;
+
+import org.apache.storm.zookeeper.ZkEventTypes;
+import org.apache.storm.zookeeper.ZkKeeperStates;
+import org.apache.zookeeper.Watcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DefaultWatcherCallBack implements WatcherCallBack {
+
+ private static Logger LOG = LoggerFactory.getLogger(DefaultWatcherCallBack.class);
+
+ @Override
+ public void execute(Watcher.Event.KeeperState state, Watcher.Event.EventType type, String path) {
+ LOG.debug("Zookeeper state update: {}, {}, {}", ZkKeeperStates.getStateName(state), ZkEventTypes.getTypeName(type), path);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/callback/WatcherCallBack.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/callback/WatcherCallBack.java b/storm-client/src/jvm/org/apache/storm/callback/WatcherCallBack.java
new file mode 100644
index 0000000..41a50ec
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/callback/WatcherCallBack.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.callback;
+
+import org.apache.zookeeper.Watcher;
+
+public interface WatcherCallBack {
+ public void execute(Watcher.Event.KeeperState state, Watcher.Event.EventType type, String path);
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/callback/ZKStateChangedCallback.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/callback/ZKStateChangedCallback.java b/storm-client/src/jvm/org/apache/storm/callback/ZKStateChangedCallback.java
new file mode 100644
index 0000000..75b0e99
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/callback/ZKStateChangedCallback.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.callback;
+
+import org.apache.zookeeper.Watcher;
+
+public interface ZKStateChangedCallback {
+ public void changed(Watcher.Event.EventType type, String path);
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/clojure/ClojureBolt.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/clojure/ClojureBolt.java b/storm-client/src/jvm/org/apache/storm/clojure/ClojureBolt.java
new file mode 100644
index 0000000..60300e2
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/clojure/ClojureBolt.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.clojure;
+
+import org.apache.storm.coordination.CoordinatedBolt.FinishedCallback;
+import org.apache.storm.generated.StreamInfo;
+import org.apache.storm.task.IBolt;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.IRichBolt;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import clojure.lang.IFn;
+import clojure.lang.PersistentArrayMap;
+import clojure.lang.Keyword;
+import clojure.lang.Symbol;
+import clojure.lang.RT;
+import org.apache.storm.utils.Utils;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+
+public class ClojureBolt implements IRichBolt, FinishedCallback {
+ Map<String, StreamInfo> _fields;
+ List<String> _fnSpec;
+ List<String> _confSpec;
+ List<Object> _params;
+
+ IBolt _bolt;
+
+ public ClojureBolt(List fnSpec, List confSpec, List<Object> params, Map<String, StreamInfo> fields) {
+ _fnSpec = fnSpec;
+ _confSpec = confSpec;
+ _params = params;
+ _fields = fields;
+ }
+
+ @Override
+ public void prepare(final Map stormConf, final TopologyContext context, final OutputCollector collector) {
+ IFn hof = Utils.loadClojureFn(_fnSpec.get(0), _fnSpec.get(1));
+ try {
+ IFn preparer = (IFn) hof.applyTo(RT.seq(_params));
+ final Map<Keyword,Object> collectorMap = new PersistentArrayMap( new Object[] {
+ Keyword.intern(Symbol.create("output-collector")), collector,
+ Keyword.intern(Symbol.create("context")), context});
+ List<Object> args = new ArrayList<Object>() {{
+ add(stormConf);
+ add(context);
+ add(collectorMap);
+ }};
+
+ _bolt = (IBolt) preparer.applyTo(RT.seq(args));
+ //this is kind of unnecessary for clojure
+ try {
+ _bolt.prepare(stormConf, context, collector);
+ } catch(AbstractMethodError ame) {
+
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void execute(Tuple input) {
+ _bolt.execute(input);
+ }
+
+ @Override
+ public void cleanup() {
+ try {
+ _bolt.cleanup();
+ } catch(AbstractMethodError ame) {
+
+ }
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ for(String stream: _fields.keySet()) {
+ StreamInfo info = _fields.get(stream);
+ declarer.declareStream(stream, info.is_direct(), new Fields(info.get_output_fields()));
+ }
+ }
+
+ @Override
+ public void finishedId(Object id) {
+ if(_bolt instanceof FinishedCallback) {
+ ((FinishedCallback) _bolt).finishedId(id);
+ }
+ }
+
+ @Override
+ public Map<String, Object> getComponentConfiguration() {
+ IFn hof = Utils.loadClojureFn(_confSpec.get(0), _confSpec.get(1));
+ try {
+ return (Map) hof.applyTo(RT.seq(_params));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/clojure/ClojureSpout.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/clojure/ClojureSpout.java b/storm-client/src/jvm/org/apache/storm/clojure/ClojureSpout.java
new file mode 100644
index 0000000..372b306
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/clojure/ClojureSpout.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.clojure;
+
+import org.apache.storm.generated.StreamInfo;
+import org.apache.storm.spout.ISpout;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.IRichSpout;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.utils.Utils;
+import clojure.lang.IFn;
+import clojure.lang.PersistentArrayMap;
+import clojure.lang.Keyword;
+import clojure.lang.Symbol;
+import clojure.lang.RT;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class ClojureSpout implements IRichSpout {
+ Map<String, StreamInfo> _fields;
+ List<String> _fnSpec;
+ List<String> _confSpec;
+ List<Object> _params;
+
+ ISpout _spout;
+
+ public ClojureSpout(List fnSpec, List confSpec, List<Object> params, Map<String, StreamInfo> fields) {
+ _fnSpec = fnSpec;
+ _confSpec = confSpec;
+ _params = params;
+ _fields = fields;
+ }
+
+
+ @Override
+ public void open(final Map conf, final TopologyContext context, final SpoutOutputCollector collector) {
+ IFn hof = Utils.loadClojureFn(_fnSpec.get(0), _fnSpec.get(1));
+ try {
+ IFn preparer = (IFn) hof.applyTo(RT.seq(_params));
+ final Map<Keyword,Object> collectorMap = new PersistentArrayMap( new Object[] {
+ Keyword.intern(Symbol.create("output-collector")), collector,
+ Keyword.intern(Symbol.create("context")), context});
+ List<Object> args = new ArrayList<Object>() {{
+ add(conf);
+ add(context);
+ add(collectorMap);
+ }};
+
+ _spout = (ISpout) preparer.applyTo(RT.seq(args));
+ //this is kind of unnecessary for clojure
+ try {
+ _spout.open(conf, context, collector);
+ } catch(AbstractMethodError ame) {
+
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void close() {
+ try {
+ _spout.close();
+ } catch(AbstractMethodError ame) {
+
+ }
+ }
+
+ @Override
+ public void nextTuple() {
+ try {
+ _spout.nextTuple();
+ } catch(AbstractMethodError ame) {
+
+ }
+
+ }
+
+ @Override
+ public void ack(Object msgId) {
+ try {
+ _spout.ack(msgId);
+ } catch(AbstractMethodError ame) {
+
+ }
+
+ }
+
+ @Override
+ public void fail(Object msgId) {
+ try {
+ _spout.fail(msgId);
+ } catch(AbstractMethodError ame) {
+
+ }
+
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ for(String stream: _fields.keySet()) {
+ StreamInfo info = _fields.get(stream);
+ declarer.declareStream(stream, info.is_direct(), new Fields(info.get_output_fields()));
+ }
+ }
+
+ @Override
+ public Map<String, Object> getComponentConfiguration() {
+ IFn hof = Utils.loadClojureFn(_confSpec.get(0), _confSpec.get(1));
+ try {
+ return (Map) hof.applyTo(RT.seq(_params));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void activate() {
+ try {
+ _spout.activate();
+ } catch(AbstractMethodError ame) {
+
+ }
+ }
+
+ @Override
+ public void deactivate() {
+ try {
+ _spout.deactivate();
+ } catch(AbstractMethodError ame) {
+
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/clojure/RichShellBolt.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/clojure/RichShellBolt.java b/storm-client/src/jvm/org/apache/storm/clojure/RichShellBolt.java
new file mode 100644
index 0000000..6de5637
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/clojure/RichShellBolt.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.clojure;
+
+import org.apache.storm.generated.StreamInfo;
+import org.apache.storm.task.ShellBolt;
+import org.apache.storm.topology.IRichBolt;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Fields;
+import java.util.Map;
+
+public class RichShellBolt extends ShellBolt implements IRichBolt {
+ private Map<String, StreamInfo> _outputs;
+
+ public RichShellBolt(String[] command, Map<String, StreamInfo> outputs) {
+ super(command);
+ _outputs = outputs;
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ for(String stream: _outputs.keySet()) {
+ StreamInfo def = _outputs.get(stream);
+ if(def.is_direct()) {
+ declarer.declareStream(stream, true, new Fields(def.get_output_fields()));
+ } else {
+ declarer.declareStream(stream, new Fields(def.get_output_fields()));
+ }
+ }
+ }
+
+ @Override
+ public Map<String, Object> getComponentConfiguration() {
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/clojure/RichShellSpout.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/clojure/RichShellSpout.java b/storm-client/src/jvm/org/apache/storm/clojure/RichShellSpout.java
new file mode 100644
index 0000000..9fb7e73
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/clojure/RichShellSpout.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.clojure;
+
+import org.apache.storm.generated.StreamInfo;
+import org.apache.storm.spout.ShellSpout;
+import org.apache.storm.topology.IRichSpout;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Fields;
+import java.util.Map;
+
+public class RichShellSpout extends ShellSpout implements IRichSpout {
+ private Map<String, StreamInfo> _outputs;
+
+ public RichShellSpout(String[] command, Map<String, StreamInfo> outputs) {
+ super(command);
+ _outputs = outputs;
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ for(String stream: _outputs.keySet()) {
+ StreamInfo def = _outputs.get(stream);
+ if(def.is_direct()) {
+ declarer.declareStream(stream, true, new Fields(def.get_output_fields()));
+ } else {
+ declarer.declareStream(stream, new Fields(def.get_output_fields()));
+ }
+ }
+ }
+
+ @Override
+ public Map<String, Object> getComponentConfiguration() {
+ return null;
+ }
+}