You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by sj...@apache.org on 2015/06/29 21:45:19 UTC
[22/24] incubator-asterixdb git commit: Introduces Feeds 2.0
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-app/src/main/java/edu/uci/ics/asterix/feeds/FeedInfo.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/feeds/FeedInfo.java b/asterix-app/src/main/java/edu/uci/ics/asterix/feeds/FeedInfo.java
new file mode 100644
index 0000000..c81b73f
--- /dev/null
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/feeds/FeedInfo.java
@@ -0,0 +1,35 @@
+package edu.uci.ics.asterix.feeds;
+
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobInfo;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+
+public class FeedInfo {
+ public JobSpecification jobSpec;
+ public JobInfo jobInfo;
+ public JobId jobId;
+ public FeedInfoType infoType;
+ public State state;
+
+ public enum State {
+ ACTIVE,
+ INACTIVE
+ }
+
+ public enum FeedInfoType {
+ INTAKE,
+ COLLECT
+ }
+
+ public FeedInfo(JobSpecification jobSpec, JobId jobId, FeedInfoType infoType) {
+ this.jobSpec = jobSpec;
+ this.jobId = jobId;
+ this.infoType = infoType;
+ this.state = State.INACTIVE;
+ }
+
+ @Override
+ public String toString() {
+ return " job id " + jobId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-app/src/main/java/edu/uci/ics/asterix/feeds/FeedJobNotificationHandler.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/feeds/FeedJobNotificationHandler.java b/asterix-app/src/main/java/edu/uci/ics/asterix/feeds/FeedJobNotificationHandler.java
new file mode 100644
index 0000000..a320faa
--- /dev/null
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/feeds/FeedJobNotificationHandler.java
@@ -0,0 +1,739 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.feeds;
+
+import java.rmi.RemoteException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.commons.lang3.StringUtils;
+
+import edu.uci.ics.asterix.api.common.FeedWorkCollection.SubscribeFeedWork;
+import edu.uci.ics.asterix.common.exceptions.ACIDException;
+import edu.uci.ics.asterix.common.feeds.FeedActivity;
+import edu.uci.ics.asterix.common.feeds.FeedConnectJobInfo;
+import edu.uci.ics.asterix.common.feeds.FeedConnectionId;
+import edu.uci.ics.asterix.common.feeds.FeedConnectionRequest;
+import edu.uci.ics.asterix.common.feeds.FeedId;
+import edu.uci.ics.asterix.common.feeds.FeedIntakeInfo;
+import edu.uci.ics.asterix.common.feeds.FeedJobInfo;
+import edu.uci.ics.asterix.common.feeds.FeedJobInfo.FeedJobState;
+import edu.uci.ics.asterix.common.feeds.FeedJobInfo.JobType;
+import edu.uci.ics.asterix.common.feeds.FeedJointKey;
+import edu.uci.ics.asterix.common.feeds.FeedPolicyAccessor;
+import edu.uci.ics.asterix.common.feeds.api.IFeedJoint;
+import edu.uci.ics.asterix.common.feeds.api.IFeedJoint.State;
+import edu.uci.ics.asterix.common.feeds.api.IFeedLifecycleEventSubscriber;
+import edu.uci.ics.asterix.common.feeds.api.IFeedLifecycleEventSubscriber.FeedLifecycleEvent;
+import edu.uci.ics.asterix.common.feeds.api.IIntakeProgressTracker;
+import edu.uci.ics.asterix.common.feeds.message.StorageReportFeedMessage;
+import edu.uci.ics.asterix.feeds.FeedLifecycleListener.Message;
+import edu.uci.ics.asterix.metadata.feeds.BuiltinFeedPolicies;
+import edu.uci.ics.asterix.metadata.feeds.FeedCollectOperatorDescriptor;
+import edu.uci.ics.asterix.metadata.feeds.FeedIntakeOperatorDescriptor;
+import edu.uci.ics.asterix.metadata.feeds.FeedMetaOperatorDescriptor;
+import edu.uci.ics.asterix.metadata.feeds.FeedWorkManager;
+import edu.uci.ics.asterix.om.util.AsterixAppContextInfo;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.std.AssignRuntimeFactory;
+import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobInfo;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.api.job.JobStatus;
+import edu.uci.ics.hyracks.storage.am.lsm.common.dataflow.LSMTreeIndexInsertUpdateDeleteOperatorDescriptor;
+
+public class FeedJobNotificationHandler implements Runnable {
+
+ private static final Logger LOGGER = Logger.getLogger(FeedJobNotificationHandler.class.getName());
+
+ private final LinkedBlockingQueue<Message> inbox;
+ private final Map<FeedConnectionId, List<IFeedLifecycleEventSubscriber>> eventSubscribers;
+
+ private final Map<JobId, FeedJobInfo> jobInfos;
+ private final Map<FeedId, FeedIntakeInfo> intakeJobInfos;
+ private final Map<FeedConnectionId, FeedConnectJobInfo> connectJobInfos;
+ private final Map<FeedId, List<IFeedJoint>> feedPipeline;
+ private final Map<FeedConnectionId, Pair<IIntakeProgressTracker, Long>> feedIntakeProgressTrackers;
+
+ public FeedJobNotificationHandler(LinkedBlockingQueue<Message> inbox) {
+ this.inbox = inbox;
+ this.jobInfos = new HashMap<JobId, FeedJobInfo>();
+ this.intakeJobInfos = new HashMap<FeedId, FeedIntakeInfo>();
+ this.connectJobInfos = new HashMap<FeedConnectionId, FeedConnectJobInfo>();
+ this.feedPipeline = new HashMap<FeedId, List<IFeedJoint>>();
+ this.eventSubscribers = new HashMap<FeedConnectionId, List<IFeedLifecycleEventSubscriber>>();
+ this.feedIntakeProgressTrackers = new HashMap<FeedConnectionId, Pair<IIntakeProgressTracker, Long>>();
+ }
+
+ @Override
+ public void run() {
+ Message mesg;
+ while (true) {
+ try {
+ mesg = inbox.take();
+ switch (mesg.messageKind) {
+ case JOB_START:
+ handleJobStartMessage(mesg);
+ break;
+ case JOB_FINISH:
+ handleJobFinishMessage(mesg);
+ break;
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
+ }
+ }
+
+ public void registerFeedIntakeProgressTracker(FeedConnectionId connectionId,
+ IIntakeProgressTracker feedIntakeProgressTracker) {
+ if (feedIntakeProgressTrackers.get(connectionId) == null) {
+ this.feedIntakeProgressTrackers.put(connectionId, new Pair<IIntakeProgressTracker, Long>(
+ feedIntakeProgressTracker, 0L));
+ } else {
+ throw new IllegalStateException(" Progress tracker for connection " + connectionId
+ + " is alreader registered");
+ }
+ }
+
+ public void deregisterFeedIntakeProgressTracker(FeedConnectionId connectionId) {
+ this.feedIntakeProgressTrackers.remove(connectionId);
+ }
+
+ public void updateTrackingInformation(StorageReportFeedMessage srm) {
+ Pair<IIntakeProgressTracker, Long> p = feedIntakeProgressTrackers.get(srm.getConnectionId());
+ if (p != null && p.second < srm.getLastPersistedTupleIntakeTimestamp()) {
+ p.second = srm.getLastPersistedTupleIntakeTimestamp();
+ p.first.notifyIngestedTupleTimestamp(p.second);
+ }
+ }
+
+ public Collection<FeedIntakeInfo> getFeedIntakeInfos() {
+ return intakeJobInfos.values();
+ }
+
+ public Collection<FeedConnectJobInfo> getFeedConnectInfos() {
+ return connectJobInfos.values();
+ }
+
+ public void registerFeedJoint(IFeedJoint feedJoint) {
+ List<IFeedJoint> feedJointsOnPipeline = feedPipeline.get(feedJoint.getOwnerFeedId());
+ if (feedJointsOnPipeline == null) {
+ feedJointsOnPipeline = new ArrayList<IFeedJoint>();
+ feedPipeline.put(feedJoint.getOwnerFeedId(), feedJointsOnPipeline);
+ feedJointsOnPipeline.add(feedJoint);
+ } else {
+ if (!feedJointsOnPipeline.contains(feedJoint)) {
+ feedJointsOnPipeline.add(feedJoint);
+ } else {
+ throw new IllegalArgumentException("Feed joint " + feedJoint + " already registered");
+ }
+ }
+ }
+
+ public void registerFeedIntakeJob(FeedId feedId, JobId jobId, JobSpecification jobSpec) throws HyracksDataException {
+ if (jobInfos.get(jobId) != null) {
+ throw new IllegalStateException("Feed job already registered");
+ }
+
+ List<IFeedJoint> joints = feedPipeline.get(feedId);
+ IFeedJoint intakeJoint = null;
+ for (IFeedJoint joint : joints) {
+ if (joint.getType().equals(IFeedJoint.FeedJointType.INTAKE)) {
+ intakeJoint = joint;
+ break;
+ }
+ }
+
+ if (intakeJoint != null) {
+ FeedIntakeInfo intakeJobInfo = new FeedIntakeInfo(jobId, FeedJobState.CREATED, FeedJobInfo.JobType.INTAKE,
+ feedId, intakeJoint, jobSpec);
+ intakeJobInfos.put(feedId, intakeJobInfo);
+ jobInfos.put(jobId, intakeJobInfo);
+
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Registered feed intake [" + jobId + "]" + " for feed " + feedId);
+ }
+ } else {
+ throw new HyracksDataException("Could not register feed intake job [" + jobId + "]" + " for feed "
+ + feedId);
+ }
+ }
+
+ public void registerFeedCollectionJob(FeedId sourceFeedId, FeedConnectionId connectionId, JobId jobId,
+ JobSpecification jobSpec, Map<String, String> feedPolicy) {
+ if (jobInfos.get(jobId) != null) {
+ throw new IllegalStateException("Feed job already registered");
+ }
+
+ List<IFeedJoint> feedJoints = feedPipeline.get(sourceFeedId);
+ FeedConnectionId cid = null;
+ IFeedJoint sourceFeedJoint = null;
+ for (IFeedJoint joint : feedJoints) {
+ cid = joint.getReceiver(connectionId);
+ if (cid != null) {
+ sourceFeedJoint = joint;
+ break;
+ }
+ }
+
+ if (cid != null) {
+ FeedConnectJobInfo cInfo = new FeedConnectJobInfo(jobId, FeedJobState.CREATED, connectionId,
+ sourceFeedJoint, null, jobSpec, feedPolicy);
+ jobInfos.put(jobId, cInfo);
+ connectJobInfos.put(connectionId, cInfo);
+
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Registered feed connection [" + jobId + "]" + " for feed " + connectionId);
+ }
+ } else {
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Could not register feed collection job [" + jobId + "]" + " for feed connection "
+ + connectionId);
+ }
+ }
+
+ }
+
+ public void deregisterFeedIntakeJob(JobId jobId) {
+ if (jobInfos.get(jobId) == null) {
+ throw new IllegalStateException(" Feed Intake job not registered ");
+ }
+
+ FeedIntakeInfo info = (FeedIntakeInfo) jobInfos.get(jobId);
+ jobInfos.remove(jobId);
+ intakeJobInfos.remove(info.getFeedId());
+
+ if (!info.getState().equals(FeedJobState.UNDER_RECOVERY)) {
+ List<IFeedJoint> joints = feedPipeline.get(info.getFeedId());
+ joints.remove(info.getIntakeFeedJoint());
+
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Deregistered feed intake job [" + jobId + "]");
+ }
+ } else {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Not removing feed joint as intake job is in " + FeedJobState.UNDER_RECOVERY + " state.");
+ }
+ }
+
+ }
+
+ private void handleJobStartMessage(Message message) throws Exception {
+ FeedJobInfo jobInfo = jobInfos.get(message.jobId);
+ switch (jobInfo.getJobType()) {
+ case INTAKE:
+ handleIntakeJobStartMessage((FeedIntakeInfo) jobInfo);
+ break;
+ case FEED_CONNECT:
+ handleCollectJobStartMessage((FeedConnectJobInfo) jobInfo);
+ break;
+ }
+
+ }
+
+ private void handleJobFinishMessage(Message message) throws Exception {
+ FeedJobInfo jobInfo = jobInfos.get(message.jobId);
+ switch (jobInfo.getJobType()) {
+ case INTAKE:
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Intake Job finished for feed intake " + jobInfo.getJobId());
+ }
+ handleFeedIntakeJobFinishMessage((FeedIntakeInfo) jobInfo, message);
+ break;
+ case FEED_CONNECT:
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Collect Job finished for " + (FeedConnectJobInfo) jobInfo);
+ }
+ handleFeedCollectJobFinishMessage((FeedConnectJobInfo) jobInfo);
+ break;
+ }
+
+ }
+
+ private synchronized void handleIntakeJobStartMessage(FeedIntakeInfo intakeJobInfo) throws Exception {
+ List<OperatorDescriptorId> intakeOperatorIds = new ArrayList<OperatorDescriptorId>();
+ Map<OperatorDescriptorId, IOperatorDescriptor> operators = intakeJobInfo.getSpec().getOperatorMap();
+ for (Entry<OperatorDescriptorId, IOperatorDescriptor> entry : operators.entrySet()) {
+ IOperatorDescriptor opDesc = entry.getValue();
+ if (opDesc instanceof FeedIntakeOperatorDescriptor) {
+ intakeOperatorIds.add(opDesc.getOperatorId());
+ }
+ }
+
+ IHyracksClientConnection hcc = AsterixAppContextInfo.getInstance().getHcc();
+ JobInfo info = hcc.getJobInfo(intakeJobInfo.getJobId());
+ List<String> intakeLocations = new ArrayList<String>();
+ for (OperatorDescriptorId intakeOperatorId : intakeOperatorIds) {
+ Map<Integer, String> operatorLocations = info.getOperatorLocations().get(intakeOperatorId);
+ int nOperatorInstances = operatorLocations.size();
+ for (int i = 0; i < nOperatorInstances; i++) {
+ intakeLocations.add(operatorLocations.get(i));
+ }
+ }
+ // intakeLocations is an ordered list; element at position i corresponds to location of i'th instance of operator
+ intakeJobInfo.setIntakeLocation(intakeLocations);
+ intakeJobInfo.getIntakeFeedJoint().setState(State.ACTIVE);
+ intakeJobInfo.setState(FeedJobState.ACTIVE);
+
+ // notify event listeners
+ notifyFeedEventSubscribers(intakeJobInfo, FeedLifecycleEvent.FEED_INTAKE_STARTED);
+ }
+
+ private void handleCollectJobStartMessage(FeedConnectJobInfo cInfo) throws RemoteException, ACIDException {
+ // set locations of feed sub-operations (intake, compute, store)
+ setLocations(cInfo);
+
+ // activate joints
+ List<IFeedJoint> joints = feedPipeline.get(cInfo.getConnectionId().getFeedId());
+ for (IFeedJoint joint : joints) {
+ if (joint.getProvider().equals(cInfo.getConnectionId())) {
+ joint.setState(State.ACTIVE);
+ if (joint.getType().equals(IFeedJoint.FeedJointType.COMPUTE)) {
+ cInfo.setComputeFeedJoint(joint);
+ }
+ }
+ }
+ cInfo.setState(FeedJobState.ACTIVE);
+
+ // register activity in metadata
+ registerFeedActivity(cInfo);
+ // notify event listeners
+ notifyFeedEventSubscribers(cInfo, FeedLifecycleEvent.FEED_COLLECT_STARTED);
+ }
+
+ private void notifyFeedEventSubscribers(FeedJobInfo jobInfo, FeedLifecycleEvent event) {
+ JobType jobType = jobInfo.getJobType();
+ List<FeedConnectionId> impactedConnections = new ArrayList<FeedConnectionId>();
+ if (jobType.equals(JobType.INTAKE)) {
+ FeedId feedId = ((FeedIntakeInfo) jobInfo).getFeedId();
+ for (FeedConnectionId connId : eventSubscribers.keySet()) {
+ if (connId.getFeedId().equals(feedId)) {
+ impactedConnections.add(connId);
+ }
+ }
+ } else {
+ impactedConnections.add(((FeedConnectJobInfo) jobInfo).getConnectionId());
+ }
+
+ for (FeedConnectionId connId : impactedConnections) {
+ List<IFeedLifecycleEventSubscriber> subscribers = eventSubscribers.get(connId);
+ if (subscribers != null && !subscribers.isEmpty()) {
+ for (IFeedLifecycleEventSubscriber subscriber : subscribers) {
+ subscriber.handleFeedEvent(event);
+ }
+ }
+ }
+ }
+
+ public synchronized void submitFeedConnectionRequest(IFeedJoint feedJoint, final FeedConnectionRequest request)
+ throws Exception {
+ List<String> locations = null;
+ switch (feedJoint.getType()) {
+ case INTAKE:
+ FeedIntakeInfo intakeInfo = intakeJobInfos.get(feedJoint.getOwnerFeedId());
+ locations = intakeInfo.getIntakeLocation();
+ break;
+ case COMPUTE:
+ FeedConnectionId connectionId = feedJoint.getProvider();
+ FeedConnectJobInfo cInfo = connectJobInfos.get(connectionId);
+ locations = cInfo.getComputeLocations();
+ break;
+ }
+
+ SubscribeFeedWork work = new SubscribeFeedWork(locations.toArray(new String[] {}), request);
+ FeedWorkManager.INSTANCE.submitWork(work, new SubscribeFeedWork.FeedSubscribeWorkEventListener());
+ }
+
+ public IFeedJoint getSourceFeedJoint(FeedConnectionId connectionId) {
+ FeedConnectJobInfo cInfo = connectJobInfos.get(connectionId);
+ if (cInfo != null) {
+ return cInfo.getSourceFeedJoint();
+ }
+ return null;
+ }
+
+ public Set<FeedConnectionId> getActiveFeedConnections() {
+ Set<FeedConnectionId> activeConnections = new HashSet<FeedConnectionId>();
+ for (FeedConnectJobInfo cInfo : connectJobInfos.values()) {
+ if (cInfo.getState().equals(FeedJobState.ACTIVE)) {
+ activeConnections.add(cInfo.getConnectionId());
+ }
+ }
+ return activeConnections;
+ }
+
+ public boolean isFeedConnectionActive(FeedConnectionId connectionId) {
+ FeedConnectJobInfo cInfo = connectJobInfos.get(connectionId);
+ if (cInfo != null) {
+ return cInfo.getState().equals(FeedJobState.ACTIVE);
+ }
+ return false;
+ }
+
+ public void setJobState(FeedConnectionId connectionId, FeedJobState jobState) {
+ FeedConnectJobInfo connectJobInfo = connectJobInfos.get(connectionId);
+ connectJobInfo.setState(jobState);
+ }
+
+ public FeedJobState getFeedJobState(FeedConnectionId connectionId) {
+ return connectJobInfos.get(connectionId).getState();
+ }
+
+ private void handleFeedIntakeJobFinishMessage(FeedIntakeInfo intakeInfo, Message message) throws Exception {
+ IHyracksClientConnection hcc = AsterixAppContextInfo.getInstance().getHcc();
+ JobInfo info = hcc.getJobInfo(message.jobId);
+ JobStatus status = info.getStatus();
+ FeedLifecycleEvent event;
+ event = status.equals(JobStatus.FAILURE) ? FeedLifecycleEvent.FEED_INTAKE_FAILURE
+ : FeedLifecycleEvent.FEED_ENDED;
+
+ // remove feed joints
+ deregisterFeedIntakeJob(message.jobId);
+
+ // notify event listeners
+ notifyFeedEventSubscribers(intakeInfo, event);
+
+ }
+
+ private void handleFeedCollectJobFinishMessage(FeedConnectJobInfo cInfo) throws Exception {
+ FeedConnectionId connectionId = cInfo.getConnectionId();
+
+ IHyracksClientConnection hcc = AsterixAppContextInfo.getInstance().getHcc();
+ JobInfo info = hcc.getJobInfo(cInfo.getJobId());
+ JobStatus status = info.getStatus();
+ boolean failure = status != null && status.equals(JobStatus.FAILURE);
+ FeedPolicyAccessor fpa = new FeedPolicyAccessor(cInfo.getFeedPolicy());
+
+ boolean removeJobHistory = !failure;
+ boolean retainSubsription = cInfo.getState().equals(FeedJobState.UNDER_RECOVERY)
+ || (failure && fpa.continueOnHardwareFailure());
+
+ if (!retainSubsription) {
+ IFeedJoint feedJoint = cInfo.getSourceFeedJoint();
+ feedJoint.removeReceiver(connectionId);
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Subscription " + cInfo.getConnectionId() + " completed successfully. Removed subscription");
+ }
+ removeFeedJointsPostPipelineTermination(cInfo.getConnectionId());
+ }
+
+ if (removeJobHistory) {
+ connectJobInfos.remove(connectionId);
+ jobInfos.remove(cInfo.getJobId());
+ feedIntakeProgressTrackers.remove(cInfo.getConnectionId());
+ }
+ deregisterFeedActivity(cInfo);
+
+ // notify event listeners
+ FeedLifecycleEvent event = failure ? FeedLifecycleEvent.FEED_COLLECT_FAILURE : FeedLifecycleEvent.FEED_ENDED;
+ notifyFeedEventSubscribers(cInfo, event);
+ }
+
+ private void registerFeedActivity(FeedConnectJobInfo cInfo) {
+ Map<String, String> feedActivityDetails = new HashMap<String, String>();
+
+ if (cInfo.getCollectLocations() != null) {
+ feedActivityDetails.put(FeedActivity.FeedActivityDetails.INTAKE_LOCATIONS,
+ StringUtils.join(cInfo.getCollectLocations().iterator(), ','));
+ }
+
+ if (cInfo.getComputeLocations() != null) {
+ feedActivityDetails.put(FeedActivity.FeedActivityDetails.COMPUTE_LOCATIONS,
+ StringUtils.join(cInfo.getComputeLocations().iterator(), ','));
+ }
+
+ if (cInfo.getStorageLocations() != null) {
+ feedActivityDetails.put(FeedActivity.FeedActivityDetails.STORAGE_LOCATIONS,
+ StringUtils.join(cInfo.getStorageLocations().iterator(), ','));
+ }
+
+ String policyName = cInfo.getFeedPolicy().get(BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY);
+ feedActivityDetails.put(FeedActivity.FeedActivityDetails.FEED_POLICY_NAME, policyName);
+
+ feedActivityDetails.put(FeedActivity.FeedActivityDetails.FEED_CONNECT_TIMESTAMP, (new Date()).toString());
+ try {
+ FeedActivity feedActivity = new FeedActivity(cInfo.getConnectionId().getFeedId().getDataverse(), cInfo
+ .getConnectionId().getFeedId().getFeedName(), cInfo.getConnectionId().getDatasetName(),
+ feedActivityDetails);
+ CentralFeedManager.getInstance().getFeedLoadManager()
+ .reportFeedActivity(cInfo.getConnectionId(), feedActivity);
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Unable to register feed activity for " + cInfo + " " + e.getMessage());
+ }
+
+ }
+
+ }
+
+ public void deregisterFeedActivity(FeedConnectJobInfo cInfo) {
+ try {
+ CentralFeedManager.getInstance().getFeedLoadManager().removeFeedActivity(cInfo.getConnectionId());
+ } catch (Exception e) {
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Unable to deregister feed activity for " + cInfo + " " + e.getMessage());
+ }
+ }
+ }
+
+ public void removeFeedJointsPostPipelineTermination(FeedConnectionId connectionId) {
+ FeedConnectJobInfo cInfo = connectJobInfos.get(connectionId);
+ List<IFeedJoint> feedJoints = feedPipeline.get(connectionId.getFeedId());
+
+ IFeedJoint sourceJoint = cInfo.getSourceFeedJoint();
+ List<FeedConnectionId> all = sourceJoint.getReceivers();
+ boolean removeSourceJoint = all.size() < 2;
+ if (removeSourceJoint) {
+ feedJoints.remove(sourceJoint);
+ }
+
+ IFeedJoint computeJoint = cInfo.getComputeFeedJoint();
+ if (computeJoint != null && computeJoint.getReceivers().size() < 2) {
+ feedJoints.remove(computeJoint);
+ }
+ }
+
+ public boolean isRegisteredFeedJob(JobId jobId) {
+ return jobInfos.get(jobId) != null;
+ }
+
+ public List<String> getFeedComputeLocations(FeedId feedId) {
+ List<IFeedJoint> feedJoints = feedPipeline.get(feedId);
+ for (IFeedJoint joint : feedJoints) {
+ if (joint.getFeedJointKey().getFeedId().equals(feedId)) {
+ return connectJobInfos.get(joint.getProvider()).getComputeLocations();
+ }
+ }
+ return null;
+ }
+
+ public List<String> getFeedStorageLocations(FeedConnectionId connectionId) {
+ return connectJobInfos.get(connectionId).getStorageLocations();
+ }
+
+ public List<String> getFeedCollectLocations(FeedConnectionId connectionId) {
+ return connectJobInfos.get(connectionId).getCollectLocations();
+ }
+
+ public List<String> getFeedIntakeLocations(FeedId feedId) {
+ return intakeJobInfos.get(feedId).getIntakeLocation();
+ }
+
+ public JobId getFeedCollectJobId(FeedConnectionId connectionId) {
+ return connectJobInfos.get(connectionId).getJobId();
+ }
+
+ public void registerFeedEventSubscriber(FeedConnectionId connectionId, IFeedLifecycleEventSubscriber subscriber) {
+ List<IFeedLifecycleEventSubscriber> subscribers = eventSubscribers.get(connectionId);
+ if (subscribers == null) {
+ subscribers = new ArrayList<IFeedLifecycleEventSubscriber>();
+ eventSubscribers.put(connectionId, subscribers);
+ }
+ subscribers.add(subscriber);
+ }
+
+ public void deregisterFeedEventSubscriber(FeedConnectionId connectionId, IFeedLifecycleEventSubscriber subscriber) {
+ List<IFeedLifecycleEventSubscriber> subscribers = eventSubscribers.get(connectionId);
+ if (subscribers != null) {
+ subscribers.remove(subscriber);
+ }
+ }
+
+ //============================
+
+ public boolean isFeedPointAvailable(FeedJointKey feedJointKey) {
+ List<IFeedJoint> joints = feedPipeline.get(feedJointKey.getFeedId());
+ if (joints != null && !joints.isEmpty()) {
+ for (IFeedJoint joint : joints) {
+ if (joint.getFeedJointKey().equals(feedJointKey)) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+ public Collection<IFeedJoint> getFeedIntakeJoints() {
+ List<IFeedJoint> intakeFeedPoints = new ArrayList<IFeedJoint>();
+ for (FeedIntakeInfo info : intakeJobInfos.values()) {
+ intakeFeedPoints.add(info.getIntakeFeedJoint());
+ }
+ return intakeFeedPoints;
+ }
+
+ public IFeedJoint getFeedJoint(FeedJointKey feedPointKey) {
+ List<IFeedJoint> joints = feedPipeline.get(feedPointKey.getFeedId());
+ if (joints != null && !joints.isEmpty()) {
+ for (IFeedJoint joint : joints) {
+ if (joint.getFeedJointKey().equals(feedPointKey)) {
+ return joint;
+ }
+ }
+ }
+ return null;
+ }
+
+ public IFeedJoint getAvailableFeedJoint(FeedJointKey feedJointKey) {
+ IFeedJoint feedJoint = getFeedJoint(feedJointKey);
+ if (feedJoint != null) {
+ return feedJoint;
+ } else {
+ String jointKeyString = feedJointKey.getStringRep();
+ List<IFeedJoint> jointsOnPipeline = feedPipeline.get(feedJointKey.getFeedId());
+ IFeedJoint candidateJoint = null;
+ if (jointsOnPipeline != null) {
+ for (IFeedJoint joint : jointsOnPipeline) {
+ if (jointKeyString.contains(joint.getFeedJointKey().getStringRep())) {
+ if (candidateJoint == null) {
+ candidateJoint = joint;
+ } else if (joint.getFeedJointKey().getStringRep()
+ .contains(candidateJoint.getFeedJointKey().getStringRep())) { // found feed point is a super set of the earlier find
+ candidateJoint = joint;
+ }
+ }
+ }
+ }
+ return candidateJoint;
+ }
+ }
+
+ public JobSpecification getCollectJobSpecification(FeedConnectionId connectionId) {
+ return connectJobInfos.get(connectionId).getSpec();
+ }
+
+ public IFeedJoint getFeedPoint(FeedId sourceFeedId, IFeedJoint.FeedJointType type) {
+ List<IFeedJoint> joints = feedPipeline.get(sourceFeedId);
+ for (IFeedJoint joint : joints) {
+ if (joint.getType().equals(type)) {
+ return joint;
+ }
+ }
+ return null;
+ }
+
+ public FeedConnectJobInfo getFeedConnectJobInfo(FeedConnectionId connectionId) {
+ return connectJobInfos.get(connectionId);
+ }
+
+ private void setLocations(FeedConnectJobInfo cInfo) {
+ JobSpecification jobSpec = cInfo.getSpec();
+
+ List<OperatorDescriptorId> collectOperatorIds = new ArrayList<OperatorDescriptorId>();
+ List<OperatorDescriptorId> computeOperatorIds = new ArrayList<OperatorDescriptorId>();
+ List<OperatorDescriptorId> storageOperatorIds = new ArrayList<OperatorDescriptorId>();
+
+ Map<OperatorDescriptorId, IOperatorDescriptor> operators = jobSpec.getOperatorMap();
+ for (Entry<OperatorDescriptorId, IOperatorDescriptor> entry : operators.entrySet()) {
+ IOperatorDescriptor opDesc = entry.getValue();
+ IOperatorDescriptor actualOp = null;
+ if (opDesc instanceof FeedMetaOperatorDescriptor) {
+ actualOp = ((FeedMetaOperatorDescriptor) opDesc).getCoreOperator();
+ } else {
+ actualOp = opDesc;
+ }
+
+ if (actualOp instanceof AlgebricksMetaOperatorDescriptor) {
+ AlgebricksMetaOperatorDescriptor op = ((AlgebricksMetaOperatorDescriptor) actualOp);
+ IPushRuntimeFactory[] runtimeFactories = op.getPipeline().getRuntimeFactories();
+ boolean computeOp = false;
+ for (IPushRuntimeFactory rf : runtimeFactories) {
+ if (rf instanceof AssignRuntimeFactory) {
+ IConnectorDescriptor connDesc = jobSpec.getOperatorInputMap().get(op.getOperatorId()).get(0);
+ IOperatorDescriptor sourceOp = jobSpec.getConnectorOperatorMap().get(connDesc.getConnectorId())
+ .getLeft().getLeft();
+ if (sourceOp instanceof FeedCollectOperatorDescriptor) {
+ computeOp = true;
+ break;
+ }
+ }
+ }
+ if (computeOp) {
+ computeOperatorIds.add(entry.getKey());
+ }
+ } else if (actualOp instanceof LSMTreeIndexInsertUpdateDeleteOperatorDescriptor) {
+ storageOperatorIds.add(entry.getKey());
+ } else if (actualOp instanceof FeedCollectOperatorDescriptor) {
+ collectOperatorIds.add(entry.getKey());
+ }
+ }
+
+ try {
+ IHyracksClientConnection hcc = AsterixAppContextInfo.getInstance().getHcc();
+ JobInfo info = hcc.getJobInfo(cInfo.getJobId());
+ List<String> collectLocations = new ArrayList<String>();
+ for (OperatorDescriptorId collectOpId : collectOperatorIds) {
+ Map<Integer, String> operatorLocations = info.getOperatorLocations().get(collectOpId);
+ int nOperatorInstances = operatorLocations.size();
+ for (int i = 0; i < nOperatorInstances; i++) {
+ collectLocations.add(operatorLocations.get(i));
+ }
+ }
+
+ List<String> computeLocations = new ArrayList<String>();
+ for (OperatorDescriptorId computeOpId : computeOperatorIds) {
+ Map<Integer, String> operatorLocations = info.getOperatorLocations().get(computeOpId);
+ if (operatorLocations != null) {
+ int nOperatorInstances = operatorLocations.size();
+ for (int i = 0; i < nOperatorInstances; i++) {
+ computeLocations.add(operatorLocations.get(i));
+ }
+ } else {
+ computeLocations.clear();
+ computeLocations.addAll(collectLocations);
+ }
+ }
+
+ List<String> storageLocations = new ArrayList<String>();
+ for (OperatorDescriptorId storageOpId : storageOperatorIds) {
+ Map<Integer, String> operatorLocations = info.getOperatorLocations().get(storageOpId);
+ if (operatorLocations == null) {
+ continue;
+ }
+ int nOperatorInstances = operatorLocations.size();
+ for (int i = 0; i < nOperatorInstances; i++) {
+ storageLocations.add(operatorLocations.get(i));
+ }
+ }
+ cInfo.setCollectLocations(collectLocations);
+ cInfo.setComputeLocations(computeLocations);
+ cInfo.setStorageLocations(storageLocations);
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-app/src/main/java/edu/uci/ics/asterix/feeds/FeedJoint.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/feeds/FeedJoint.java b/asterix-app/src/main/java/edu/uci/ics/asterix/feeds/FeedJoint.java
new file mode 100644
index 0000000..6ba1b30
--- /dev/null
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/feeds/FeedJoint.java
@@ -0,0 +1,186 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.feeds;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.asterix.common.feeds.FeedConnectionId;
+import edu.uci.ics.asterix.common.feeds.FeedConnectionRequest;
+import edu.uci.ics.asterix.common.feeds.FeedId;
+import edu.uci.ics.asterix.common.feeds.FeedJointKey;
+import edu.uci.ics.asterix.common.feeds.api.IFeedJoint;
+import edu.uci.ics.asterix.common.feeds.api.IFeedLifecycleListener.ConnectionLocation;
+
+public class FeedJoint implements IFeedJoint {
+
+ private static final Logger LOGGER = Logger.getLogger(FeedJoint.class.getName());
+
+ /** A unique key associated with the feed point **/
+ private final FeedJointKey key;
+
+ /** The state associated with the FeedJoint **/
+ private State state;
+
+ /** A list of subscribers that receive data from this FeedJoint **/
+ private final List<FeedConnectionId> receivers;
+
+ /** The feedId on which the feedPoint resides **/
+ private final FeedId ownerFeedId;
+
+ /** A list of feed subscription requests submitted for subscribing to the FeedPoint's data **/
+ private final List<FeedConnectionRequest> connectionRequests;
+
+ private final ConnectionLocation connectionLocation;
+
+ private final FeedJointType type;
+
+ private FeedConnectionId provider;
+
+ public FeedJoint(FeedJointKey key, FeedId ownerFeedId, ConnectionLocation subscriptionLocation, FeedJointType type,
+ FeedConnectionId provider) {
+ this.key = key;
+ this.ownerFeedId = ownerFeedId;
+ this.type = type;
+ this.receivers = new ArrayList<FeedConnectionId>();
+ this.state = State.CREATED;
+ this.connectionLocation = subscriptionLocation;
+ this.connectionRequests = new ArrayList<FeedConnectionRequest>();
+ this.provider = provider;
+ }
+
+ @Override
+ public int hashCode() {
+ return key.hashCode();
+ }
+
+ public void addReceiver(FeedConnectionId connectionId) {
+ receivers.add(connectionId);
+ }
+
+ public void removeReceiver(FeedConnectionId connectionId) {
+ receivers.remove(connectionId);
+ }
+
+ public synchronized void addConnectionRequest(FeedConnectionRequest request) {
+ connectionRequests.add(request);
+ if (state.equals(State.ACTIVE)) {
+ handlePendingConnectionRequest();
+ }
+ }
+
+ public synchronized void setState(State state) {
+ if (this.state.equals(state)) {
+ return;
+ }
+ this.state = state;
+ if (this.state.equals(State.ACTIVE)) {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Feed joint " + this + " is now " + State.ACTIVE);
+ }
+ handlePendingConnectionRequest();
+ }
+ }
+
+ private void handlePendingConnectionRequest() {
+ for (FeedConnectionRequest connectionRequest : connectionRequests) {
+ FeedConnectionId connectionId = new FeedConnectionId(connectionRequest.getReceivingFeedId(),
+ connectionRequest.getTargetDataset());
+ try {
+ FeedLifecycleListener.INSTANCE.submitFeedConnectionRequest(this, connectionRequest);
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Submitted feed connection request " + connectionRequest + " at feed joint " + this);
+ }
+ addReceiver(connectionId);
+ } catch (Exception e) {
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Unsuccessful attempt at submitting connection request " + connectionRequest
+ + " at feed joint " + this + ". Message " + e.getMessage());
+ }
+ e.printStackTrace();
+ }
+ }
+ connectionRequests.clear();
+ }
+
+ public FeedConnectionId getReceiver(FeedConnectionId connectionId) {
+ for (FeedConnectionId cid : receivers) {
+ if (cid.equals(connectionId)) {
+ return cid;
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public String toString() {
+ return key.toString() + " [" + connectionLocation + "]" + "[" + state + "]";
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o == null) {
+ return false;
+ }
+ if (o == this) {
+ return true;
+ }
+ if (!(o instanceof FeedJoint)) {
+ return false;
+ }
+ return ((FeedJoint) o).getFeedJointKey().equals(this.key);
+ }
+
+ public FeedId getOwnerFeedId() {
+ return ownerFeedId;
+ }
+
+ public List<FeedConnectionRequest> getConnectionRequests() {
+ return connectionRequests;
+ }
+
+ public ConnectionLocation getConnectionLocation() {
+ return connectionLocation;
+ }
+
+ public FeedJointType getType() {
+ return type;
+ }
+
+ @Override
+ public FeedConnectionId getProvider() {
+ return provider;
+ }
+
+ public List<FeedConnectionId> getReceivers() {
+ return receivers;
+ }
+
+ public FeedJointKey getKey() {
+ return key;
+ }
+
+ public synchronized State getState() {
+ return state;
+ }
+
+ @Override
+ public FeedJointKey getFeedJointKey() {
+ return key;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-app/src/main/java/edu/uci/ics/asterix/feeds/FeedLifecycleListener.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/feeds/FeedLifecycleListener.java b/asterix-app/src/main/java/edu/uci/ics/asterix/feeds/FeedLifecycleListener.java
new file mode 100644
index 0000000..999537f
--- /dev/null
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/feeds/FeedLifecycleListener.java
@@ -0,0 +1,486 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.feeds;
+
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.asterix.api.common.SessionConfig;
+import edu.uci.ics.asterix.api.common.SessionConfig.OutputFormat;
+import edu.uci.ics.asterix.aql.base.Statement;
+import edu.uci.ics.asterix.aql.expression.DataverseDecl;
+import edu.uci.ics.asterix.aql.expression.DisconnectFeedStatement;
+import edu.uci.ics.asterix.aql.expression.Identifier;
+import edu.uci.ics.asterix.aql.translator.AqlTranslator;
+import edu.uci.ics.asterix.common.api.IClusterManagementWork;
+import edu.uci.ics.asterix.common.api.IClusterManagementWork.ClusterState;
+import edu.uci.ics.asterix.common.api.IClusterManagementWorkResponse;
+import edu.uci.ics.asterix.common.feeds.FeedConnectJobInfo;
+import edu.uci.ics.asterix.common.feeds.FeedConnectionId;
+import edu.uci.ics.asterix.common.feeds.FeedConnectionRequest;
+import edu.uci.ics.asterix.common.feeds.FeedId;
+import edu.uci.ics.asterix.common.feeds.FeedIntakeInfo;
+import edu.uci.ics.asterix.common.feeds.FeedJobInfo;
+import edu.uci.ics.asterix.common.feeds.FeedJobInfo.FeedJobState;
+import edu.uci.ics.asterix.common.feeds.FeedJointKey;
+import edu.uci.ics.asterix.common.feeds.api.IFeedJoint;
+import edu.uci.ics.asterix.common.feeds.api.IFeedLifecycleEventSubscriber;
+import edu.uci.ics.asterix.common.feeds.api.IFeedLifecycleListener;
+import edu.uci.ics.asterix.common.feeds.api.IIntakeProgressTracker;
+import edu.uci.ics.asterix.common.feeds.message.StorageReportFeedMessage;
+import edu.uci.ics.asterix.metadata.MetadataManager;
+import edu.uci.ics.asterix.metadata.MetadataTransactionContext;
+import edu.uci.ics.asterix.metadata.cluster.AddNodeWork;
+import edu.uci.ics.asterix.metadata.cluster.ClusterManager;
+import edu.uci.ics.asterix.metadata.feeds.FeedCollectOperatorDescriptor;
+import edu.uci.ics.asterix.metadata.feeds.FeedIntakeOperatorDescriptor;
+import edu.uci.ics.asterix.om.util.AsterixAppContextInfo;
+import edu.uci.ics.asterix.om.util.AsterixClusterProperties;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.IActivityClusterGraphGeneratorFactory;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+
+/**
+ * A listener that subscribes to events associated with cluster membership
+ * (nodes joining/leaving the cluster) and job lifecycle (start/end of a job).
+ * Subscription to such events allows keeping track of feed ingestion jobs and
+ * take any corrective action that may be required when a node involved in a
+ * feed leaves the cluster.
+ */
+public class FeedLifecycleListener implements IFeedLifecycleListener {
+
+ private static final Logger LOGGER = Logger.getLogger(FeedLifecycleListener.class.getName());
+
+ public static FeedLifecycleListener INSTANCE = new FeedLifecycleListener();
+
+ private final LinkedBlockingQueue<Message> jobEventInbox;
+ private final LinkedBlockingQueue<IClusterManagementWorkResponse> responseInbox;
+ private final Map<FeedCollectInfo, List<String>> dependentFeeds = new HashMap<FeedCollectInfo, List<String>>();
+ private final Map<FeedConnectionId, LinkedBlockingQueue<String>> feedReportQueue;
+ private final FeedJobNotificationHandler feedJobNotificationHandler;
+ private final FeedWorkRequestResponseHandler feedWorkRequestResponseHandler;
+ private final ExecutorService executorService;
+
+ private ClusterState state;
+
+ private FeedLifecycleListener() {
+ this.jobEventInbox = new LinkedBlockingQueue<Message>();
+ this.feedJobNotificationHandler = new FeedJobNotificationHandler(jobEventInbox);
+ this.responseInbox = new LinkedBlockingQueue<IClusterManagementWorkResponse>();
+ this.feedWorkRequestResponseHandler = new FeedWorkRequestResponseHandler(responseInbox);
+ this.feedReportQueue = new HashMap<FeedConnectionId, LinkedBlockingQueue<String>>();
+ this.executorService = Executors.newCachedThreadPool();
+ this.executorService.execute(feedJobNotificationHandler);
+ this.executorService.execute(feedWorkRequestResponseHandler);
+ ClusterManager.INSTANCE.registerSubscriber(this);
+ this.state = AsterixClusterProperties.INSTANCE.getState();
+ }
+
+ @Override
+ public void notifyJobStart(JobId jobId) throws HyracksException {
+ if (feedJobNotificationHandler.isRegisteredFeedJob(jobId)) {
+ jobEventInbox.add(new Message(jobId, Message.MessageKind.JOB_START));
+ }
+ }
+
+ @Override
+ public void notifyJobFinish(JobId jobId) throws HyracksException {
+ if (feedJobNotificationHandler.isRegisteredFeedJob(jobId)) {
+ jobEventInbox.add(new Message(jobId, Message.MessageKind.JOB_FINISH));
+ } else {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("NO NEED TO NOTIFY JOB FINISH!");
+ }
+ }
+ }
+
+ public FeedConnectJobInfo getFeedConnectJobInfo(FeedConnectionId connectionId) {
+ return feedJobNotificationHandler.getFeedConnectJobInfo(connectionId);
+ }
+
+ public void registerFeedIntakeProgressTracker(FeedConnectionId connectionId,
+ IIntakeProgressTracker feedIntakeProgressTracker) {
+ feedJobNotificationHandler.registerFeedIntakeProgressTracker(connectionId, feedIntakeProgressTracker);
+ }
+
+ public void deregisterFeedIntakeProgressTracker(FeedConnectionId connectionId) {
+ feedJobNotificationHandler.deregisterFeedIntakeProgressTracker(connectionId);
+ }
+
+ public void updateTrackingInformation(StorageReportFeedMessage srm) {
+ feedJobNotificationHandler.updateTrackingInformation(srm);
+ }
+
+ /*
+ * Traverse job specification to categorize job as a feed intake job or a feed collection job
+ */
+ @Override
+ public void notifyJobCreation(JobId jobId, IActivityClusterGraphGeneratorFactory acggf) throws HyracksException {
+ JobSpecification spec = acggf.getJobSpecification();
+ FeedConnectionId feedConnectionId = null;
+ Map<String, String> feedPolicy = null;
+ for (IOperatorDescriptor opDesc : spec.getOperatorMap().values()) {
+ if (opDesc instanceof FeedCollectOperatorDescriptor) {
+ feedConnectionId = ((FeedCollectOperatorDescriptor) opDesc).getFeedConnectionId();
+ feedPolicy = ((FeedCollectOperatorDescriptor) opDesc).getFeedPolicyProperties();
+ feedJobNotificationHandler.registerFeedCollectionJob(
+ ((FeedCollectOperatorDescriptor) opDesc).getSourceFeedId(), feedConnectionId, jobId, spec,
+ feedPolicy);
+ break;
+ } else if (opDesc instanceof FeedIntakeOperatorDescriptor) {
+ feedJobNotificationHandler.registerFeedIntakeJob(((FeedIntakeOperatorDescriptor) opDesc).getFeedId(),
+ jobId, spec);
+ break;
+ }
+ }
+ }
+
+ public void setJobState(FeedConnectionId connectionId, FeedJobState jobState) {
+ feedJobNotificationHandler.setJobState(connectionId, jobState);
+ }
+
+ public FeedJobState getFeedJobState(FeedConnectionId connectionId) {
+ return feedJobNotificationHandler.getFeedJobState(connectionId);
+ }
+
+ public static class Message {
+ public JobId jobId;
+
+ public enum MessageKind {
+ JOB_START,
+ JOB_FINISH
+ }
+
+ public MessageKind messageKind;
+
+ public Message(JobId jobId, MessageKind msgKind) {
+ this.jobId = jobId;
+ this.messageKind = msgKind;
+ }
+ }
+
+ @Override
+ public Set<IClusterManagementWork> notifyNodeFailure(Set<String> deadNodeIds) {
+ Set<IClusterManagementWork> workToBeDone = new HashSet<IClusterManagementWork>();
+
+ Collection<FeedIntakeInfo> intakeInfos = feedJobNotificationHandler.getFeedIntakeInfos();
+ Collection<FeedConnectJobInfo> connectJobInfos = feedJobNotificationHandler.getFeedConnectInfos();
+
+ Map<String, List<FeedJobInfo>> impactedJobs = new HashMap<String, List<FeedJobInfo>>();
+
+ for (String deadNode : deadNodeIds) {
+ for (FeedIntakeInfo intakeInfo : intakeInfos) {
+ if (intakeInfo.getIntakeLocation().contains(deadNode)) {
+ List<FeedJobInfo> infos = impactedJobs.get(deadNode);
+ if (infos == null) {
+ infos = new ArrayList<FeedJobInfo>();
+ impactedJobs.put(deadNode, infos);
+ }
+ infos.add(intakeInfo);
+ intakeInfo.setState(FeedJobState.UNDER_RECOVERY);
+ }
+ }
+
+ for (FeedConnectJobInfo connectInfo : connectJobInfos) {
+ if (connectInfo.getStorageLocations().contains(deadNode)) {
+ continue;
+ }
+ if (connectInfo.getComputeLocations().contains(deadNode)
+ || connectInfo.getCollectLocations().contains(deadNode)) {
+ List<FeedJobInfo> infos = impactedJobs.get(deadNode);
+ if (infos == null) {
+ infos = new ArrayList<FeedJobInfo>();
+ impactedJobs.put(deadNode, infos);
+ }
+ infos.add(connectInfo);
+ connectInfo.setState(FeedJobState.UNDER_RECOVERY);
+ feedJobNotificationHandler.deregisterFeedActivity(connectInfo);
+ }
+ }
+
+ }
+
+ if (impactedJobs.size() > 0) {
+ AddNodeWork addNodeWork = new AddNodeWork(deadNodeIds, deadNodeIds.size(), this);
+ feedWorkRequestResponseHandler.registerFeedWork(addNodeWork.getWorkId(), impactedJobs);
+ workToBeDone.add(addNodeWork);
+ }
+ return workToBeDone;
+
+ }
+
+ public static class FailureReport {
+
+ private final List<Pair<FeedConnectJobInfo, List<String>>> recoverableConnectJobs;
+ private final Map<IFeedJoint, List<String>> recoverableIntakeFeedIds;
+
+ public FailureReport(Map<IFeedJoint, List<String>> recoverableIntakeFeedIds,
+ List<Pair<FeedConnectJobInfo, List<String>>> recoverableSubscribers) {
+ this.recoverableConnectJobs = recoverableSubscribers;
+ this.recoverableIntakeFeedIds = recoverableIntakeFeedIds;
+ }
+
+ public List<Pair<FeedConnectJobInfo, List<String>>> getRecoverableSubscribers() {
+ return recoverableConnectJobs;
+ }
+
+ public Map<IFeedJoint, List<String>> getRecoverableIntakeFeedIds() {
+ return recoverableIntakeFeedIds;
+ }
+
+ }
+
+ @Override
+ public Set<IClusterManagementWork> notifyNodeJoin(String joinedNodeId) {
+ ClusterState newState = AsterixClusterProperties.INSTANCE.getState();
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info(joinedNodeId + " joined the cluster. " + "Asterix state: " + newState);
+ }
+
+ boolean needToReActivateFeeds = !newState.equals(state) && (newState == ClusterState.ACTIVE);
+ if (needToReActivateFeeds) {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info(joinedNodeId + " Resuming loser feeds (if any)");
+ }
+ try {
+ FeedsActivator activator = new FeedsActivator();
+ (new Thread(activator)).start();
+ } catch (Exception e) {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Exception in resuming feeds" + e.getMessage());
+ }
+ }
+ state = newState;
+ } else {
+ List<FeedCollectInfo> feedsThatCanBeRevived = new ArrayList<FeedCollectInfo>();
+ for (Entry<FeedCollectInfo, List<String>> entry : dependentFeeds.entrySet()) {
+ List<String> requiredNodeIds = entry.getValue();
+ if (requiredNodeIds.contains(joinedNodeId)) {
+ requiredNodeIds.remove(joinedNodeId);
+ if (requiredNodeIds.isEmpty()) {
+ feedsThatCanBeRevived.add(entry.getKey());
+ }
+ }
+ }
+ if (!feedsThatCanBeRevived.isEmpty()) {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info(joinedNodeId + " Resuming feeds after rejoining of node " + joinedNodeId);
+ }
+ FeedsActivator activator = new FeedsActivator(feedsThatCanBeRevived);
+ (new Thread(activator)).start();
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public void notifyRequestCompletion(IClusterManagementWorkResponse response) {
+ try {
+ responseInbox.put(response);
+ } catch (InterruptedException e) {
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Interrupted exception");
+ }
+ }
+ }
+
+ @Override
+ public void notifyStateChange(ClusterState previousState, ClusterState newState) {
+ switch (newState) {
+ case ACTIVE:
+ if (previousState.equals(ClusterState.UNUSABLE)) {
+ try {
+ FeedsActivator activator = new FeedsActivator();
+ // (new Thread(activator)).start();
+ } catch (Exception e) {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Exception in resuming feeds" + e.getMessage());
+ }
+ }
+ }
+ break;
+ }
+
+ }
+
+ public static class FeedsDeActivator implements Runnable {
+
+ private List<FeedConnectJobInfo> failedConnectjobs;
+
+ public FeedsDeActivator(List<FeedConnectJobInfo> failedConnectjobs) {
+ this.failedConnectjobs = failedConnectjobs;
+ }
+
+ @Override
+ public void run() {
+ for (FeedConnectJobInfo failedConnectJob : failedConnectjobs) {
+ endFeed(failedConnectJob);
+ }
+ }
+
+ private void endFeed(FeedConnectJobInfo cInfo) {
+ MetadataTransactionContext ctx = null;
+ PrintWriter writer = new PrintWriter(System.out, true);
+ SessionConfig pc = new SessionConfig(writer, OutputFormat.ADM);
+
+ try {
+ ctx = MetadataManager.INSTANCE.beginTransaction();
+ FeedId feedId = cInfo.getConnectionId().getFeedId();
+ DisconnectFeedStatement stmt = new DisconnectFeedStatement(new Identifier(feedId.getDataverse()),
+ new Identifier(feedId.getFeedName()), new Identifier(cInfo.getConnectionId().getDatasetName()));
+ List<Statement> statements = new ArrayList<Statement>();
+ DataverseDecl dataverseDecl = new DataverseDecl(new Identifier(feedId.getDataverse()));
+ statements.add(dataverseDecl);
+ statements.add(stmt);
+ AqlTranslator translator = new AqlTranslator(statements, pc);
+ translator.compileAndExecute(AsterixAppContextInfo.getInstance().getHcc(), null,
+ AqlTranslator.ResultDelivery.SYNC);
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("End irrecoverable feed: " + cInfo.getConnectionId());
+ }
+ MetadataManager.INSTANCE.commitTransaction(ctx);
+ } catch (Exception e) {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Exception in ending loser feed: " + cInfo.getConnectionId() + " Exception "
+ + e.getMessage());
+ }
+ e.printStackTrace();
+ try {
+ MetadataManager.INSTANCE.abortTransaction(ctx);
+ } catch (Exception e2) {
+ e2.addSuppressed(e);
+ if (LOGGER.isLoggable(Level.SEVERE)) {
+ LOGGER.severe("Exception in aborting transaction! System is in inconsistent state");
+ }
+ }
+
+ }
+
+ }
+ }
+
+ public void submitFeedConnectionRequest(IFeedJoint feedPoint, FeedConnectionRequest subscriptionRequest)
+ throws Exception {
+ feedJobNotificationHandler.submitFeedConnectionRequest(feedPoint, subscriptionRequest);
+ }
+
+ @Override
+ public List<FeedConnectionId> getActiveFeedConnections(FeedId feedId) {
+ List<FeedConnectionId> connections = new ArrayList<FeedConnectionId>();
+ Collection<FeedConnectionId> activeConnections = feedJobNotificationHandler.getActiveFeedConnections();
+ if (feedId != null) {
+ for (FeedConnectionId connectionId : activeConnections) {
+ if (connectionId.getFeedId().equals(feedId)) {
+ connections.add(connectionId);
+ }
+ }
+ } else {
+ connections.addAll(activeConnections);
+ }
+ return connections;
+ }
+
+ @Override
+ public List<String> getComputeLocations(FeedId feedId) {
+ return feedJobNotificationHandler.getFeedComputeLocations(feedId);
+ }
+
+ @Override
+ public List<String> getIntakeLocations(FeedId feedId) {
+ return feedJobNotificationHandler.getFeedIntakeLocations(feedId);
+ }
+
+ @Override
+ public List<String> getStoreLocations(FeedConnectionId feedConnectionId) {
+ return feedJobNotificationHandler.getFeedStorageLocations(feedConnectionId);
+ }
+
+ @Override
+ public List<String> getCollectLocations(FeedConnectionId feedConnectionId) {
+ return feedJobNotificationHandler.getFeedCollectLocations(feedConnectionId);
+ }
+
+ @Override
+ public boolean isFeedConnectionActive(FeedConnectionId connectionId) {
+ return feedJobNotificationHandler.isFeedConnectionActive(connectionId);
+ }
+
+ public void reportPartialDisconnection(FeedConnectionId connectionId) {
+ feedJobNotificationHandler.removeFeedJointsPostPipelineTermination(connectionId);
+ }
+
+ public void registerFeedReportQueue(FeedConnectionId feedId, LinkedBlockingQueue<String> queue) {
+ feedReportQueue.put(feedId, queue);
+ }
+
+ public void deregisterFeedReportQueue(FeedConnectionId feedId, LinkedBlockingQueue<String> queue) {
+ feedReportQueue.remove(feedId);
+ }
+
+ public LinkedBlockingQueue<String> getFeedReportQueue(FeedConnectionId feedId) {
+ return feedReportQueue.get(feedId);
+ }
+
+ @Override
+ public IFeedJoint getAvailableFeedJoint(FeedJointKey feedJointKey) {
+ return feedJobNotificationHandler.getAvailableFeedJoint(feedJointKey);
+ }
+
+ @Override
+ public boolean isFeedJointAvailable(FeedJointKey feedJointKey) {
+ return feedJobNotificationHandler.isFeedPointAvailable(feedJointKey);
+ }
+
+ public void registerFeedJoint(IFeedJoint feedJoint) {
+ feedJobNotificationHandler.registerFeedJoint(feedJoint);
+ }
+
+ public IFeedJoint getFeedJoint(FeedJointKey feedJointKey) {
+ return feedJobNotificationHandler.getFeedJoint(feedJointKey);
+ }
+
+ public void registerFeedEventSubscriber(FeedConnectionId connectionId, IFeedLifecycleEventSubscriber subscriber) {
+ feedJobNotificationHandler.registerFeedEventSubscriber(connectionId, subscriber);
+ }
+
+ public void deregisterFeedEventSubscriber(FeedConnectionId connectionId, IFeedLifecycleEventSubscriber subscriber) {
+ feedJobNotificationHandler.deregisterFeedEventSubscriber(connectionId, subscriber);
+
+ }
+
+ public JobSpecification getCollectJobSpecification(FeedConnectionId connectionId) {
+ return feedJobNotificationHandler.getCollectJobSpecification(connectionId);
+ }
+
+ public JobId getFeedCollectJobId(FeedConnectionId connectionId) {
+ return feedJobNotificationHandler.getFeedCollectJobId(connectionId);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-app/src/main/java/edu/uci/ics/asterix/feeds/FeedLoadManager.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/feeds/FeedLoadManager.java b/asterix-app/src/main/java/edu/uci/ics/asterix/feeds/FeedLoadManager.java
new file mode 100644
index 0000000..cf94b28
--- /dev/null
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/feeds/FeedLoadManager.java
@@ -0,0 +1,298 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.feeds;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.common.feeds.FeedActivity;
+import edu.uci.ics.asterix.common.feeds.FeedConnectionId;
+import edu.uci.ics.asterix.common.feeds.FeedJobInfo.FeedJobState;
+import edu.uci.ics.asterix.common.feeds.FeedRuntimeId;
+import edu.uci.ics.asterix.common.feeds.NodeLoadReport;
+import edu.uci.ics.asterix.common.feeds.api.IFeedLoadManager;
+import edu.uci.ics.asterix.common.feeds.api.IFeedRuntime.FeedRuntimeType;
+import edu.uci.ics.asterix.common.feeds.api.IFeedTrackingManager;
+import edu.uci.ics.asterix.common.feeds.message.FeedCongestionMessage;
+import edu.uci.ics.asterix.common.feeds.message.FeedReportMessage;
+import edu.uci.ics.asterix.common.feeds.message.ScaleInReportMessage;
+import edu.uci.ics.asterix.common.feeds.message.ThrottlingEnabledFeedMessage;
+import edu.uci.ics.asterix.file.FeedOperations;
+import edu.uci.ics.asterix.metadata.feeds.FeedUtil;
+import edu.uci.ics.asterix.metadata.feeds.PrepareStallMessage;
+import edu.uci.ics.asterix.metadata.feeds.TerminateDataFlowMessage;
+import edu.uci.ics.asterix.om.util.AsterixAppContextInfo;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+
+public class FeedLoadManager implements IFeedLoadManager {
+
+ private static final Logger LOGGER = Logger.getLogger(FeedLoadManager.class.getName());
+
+ private static final long MIN_MODIFICATION_INTERVAL = 180000; // 10 seconds
+ private final TreeSet<NodeLoadReport> nodeReports;
+ private final Map<FeedConnectionId, FeedActivity> feedActivities;
+ private final Map<String, Pair<Integer, Integer>> feedMetrics;
+
+ private FeedConnectionId lastModified;
+ private long lastModifiedTimestamp;
+
+ private static final int UNKNOWN = -1;
+
+ public FeedLoadManager() {
+ this.nodeReports = new TreeSet<NodeLoadReport>();
+ this.feedActivities = new HashMap<FeedConnectionId, FeedActivity>();
+ this.feedMetrics = new HashMap<String, Pair<Integer, Integer>>();
+ }
+
+ @Override
+ public void submitNodeLoadReport(NodeLoadReport report) {
+ nodeReports.remove(report);
+ nodeReports.add(report);
+ }
+
+ @Override
+ public void reportCongestion(FeedCongestionMessage message) throws AsterixException {
+ FeedRuntimeId runtimeId = message.getRuntimeId();
+ FeedJobState jobState = FeedLifecycleListener.INSTANCE.getFeedJobState(message.getConnectionId());
+ if (jobState == null
+ || (jobState.equals(FeedJobState.UNDER_RECOVERY))
+ || (message.getConnectionId().equals(lastModified) && System.currentTimeMillis()
+ - lastModifiedTimestamp < MIN_MODIFICATION_INTERVAL)) {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Ignoring congestion report from " + runtimeId);
+ }
+ return;
+ } else {
+ try {
+ FeedLifecycleListener.INSTANCE.setJobState(message.getConnectionId(), FeedJobState.UNDER_RECOVERY);
+ int inflowRate = message.getInflowRate();
+ int outflowRate = message.getOutflowRate();
+ List<String> currentComputeLocations = new ArrayList<String>();
+ currentComputeLocations.addAll(FeedLifecycleListener.INSTANCE.getComputeLocations(message
+ .getConnectionId().getFeedId()));
+ int computeCardinality = currentComputeLocations.size();
+ int requiredCardinality = (int) Math
+ .ceil((double) ((computeCardinality * inflowRate) / (double) outflowRate)) + 5;
+ int additionalComputeNodes = requiredCardinality - computeCardinality;
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("INCREASING COMPUTE CARDINALITY from " + computeCardinality + " by "
+ + additionalComputeNodes);
+ }
+
+ List<String> helperComputeNodes = getNodeForSubstitution(additionalComputeNodes);
+
+ // Step 1) Alter the original feed job to adjust the cardinality
+ JobSpecification jobSpec = FeedLifecycleListener.INSTANCE.getCollectJobSpecification(message
+ .getConnectionId());
+ helperComputeNodes.addAll(currentComputeLocations);
+ List<String> newLocations = new ArrayList<String>();
+ newLocations.addAll(currentComputeLocations);
+ newLocations.addAll(helperComputeNodes);
+ FeedUtil.increaseCardinality(jobSpec, FeedRuntimeType.COMPUTE, requiredCardinality, newLocations);
+
+ // Step 2) send prepare to stall message
+ gracefullyTerminateDataFlow(message.getConnectionId(), Integer.MAX_VALUE);
+
+ // Step 3) run the altered job specification
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("New Job after adjusting to the workload " + jobSpec);
+ }
+
+ Thread.sleep(10000);
+ runJob(jobSpec, false);
+ lastModified = message.getConnectionId();
+ lastModifiedTimestamp = System.currentTimeMillis();
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ if (LOGGER.isLoggable(Level.SEVERE)) {
+ LOGGER.severe("Unable to form the required job for scaling in/out" + e.getMessage());
+ }
+ throw new AsterixException(e);
+ }
+ }
+ }
+
+ @Override
+ public void submitScaleInPossibleReport(ScaleInReportMessage message) throws Exception {
+ FeedJobState jobState = FeedLifecycleListener.INSTANCE.getFeedJobState(message.getConnectionId());
+ if (jobState == null || (jobState.equals(FeedJobState.UNDER_RECOVERY))) {
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("JobState information for job " + "[" + message.getConnectionId() + "]" + " not found ");
+ }
+ return;
+ } else {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Processing scale-in message " + message);
+ }
+ FeedLifecycleListener.INSTANCE.setJobState(message.getConnectionId(), FeedJobState.UNDER_RECOVERY);
+ JobSpecification jobSpec = FeedLifecycleListener.INSTANCE.getCollectJobSpecification(message
+ .getConnectionId());
+ int reducedCardinality = message.getReducedCardinaliy();
+ List<String> currentComputeLocations = new ArrayList<String>();
+ currentComputeLocations.addAll(FeedLifecycleListener.INSTANCE.getComputeLocations(message.getConnectionId()
+ .getFeedId()));
+ FeedUtil.decreaseComputeCardinality(jobSpec, FeedRuntimeType.COMPUTE, reducedCardinality,
+ currentComputeLocations);
+
+ gracefullyTerminateDataFlow(message.getConnectionId(), reducedCardinality - 1);
+ Thread.sleep(3000);
+ JobId newJobId = runJob(jobSpec, false);
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Launch modified job" + "[" + newJobId + "]" + "for scale-in \n" + jobSpec);
+ }
+
+ }
+ }
+
+ private void gracefullyTerminateDataFlow(FeedConnectionId connectionId, int computePartitionRetainLimit)
+ throws Exception {
+ // Step 1) send prepare to stall message
+ PrepareStallMessage stallMessage = new PrepareStallMessage(connectionId, computePartitionRetainLimit);
+ List<String> intakeLocations = FeedLifecycleListener.INSTANCE.getCollectLocations(connectionId);
+ List<String> computeLocations = FeedLifecycleListener.INSTANCE.getComputeLocations(connectionId.getFeedId());
+ List<String> storageLocations = FeedLifecycleListener.INSTANCE.getStoreLocations(connectionId);
+
+ Set<String> operatorLocations = new HashSet<String>();
+
+ operatorLocations.addAll(intakeLocations);
+ operatorLocations.addAll(computeLocations);
+ operatorLocations.addAll(storageLocations);
+
+ JobSpecification messageJobSpec = FeedOperations.buildPrepareStallMessageJob(stallMessage, operatorLocations);
+ runJob(messageJobSpec, true);
+
+ // Step 2)
+ TerminateDataFlowMessage terminateMesg = new TerminateDataFlowMessage(connectionId);
+ messageJobSpec = FeedOperations.buildTerminateFlowMessageJob(terminateMesg, intakeLocations);
+ runJob(messageJobSpec, true);
+ }
+
+ public static JobId runJob(JobSpecification spec, boolean waitForCompletion) throws Exception {
+ IHyracksClientConnection hcc = AsterixAppContextInfo.getInstance().getHcc();
+ JobId jobId = hcc.startJob(spec);
+ if (waitForCompletion) {
+ hcc.waitForCompletion(jobId);
+ }
+ return jobId;
+ }
+
+ @Override
+ public void submitFeedRuntimeReport(FeedReportMessage report) {
+ String key = "" + report.getConnectionId() + ":" + report.getRuntimeId().getFeedRuntimeType();
+ Pair<Integer, Integer> value = feedMetrics.get(key);
+ if (value == null) {
+ value = new Pair<Integer, Integer>(report.getValue(), 1);
+ feedMetrics.put(key, value);
+ } else {
+ value.first = value.first + report.getValue();
+ value.second = value.second + 1;
+ }
+ }
+
+ @Override
+ public int getOutflowRate(FeedConnectionId connectionId, FeedRuntimeType runtimeType) {
+ int rVal;
+ String key = "" + connectionId + ":" + runtimeType;
+ feedMetrics.get(key);
+ Pair<Integer, Integer> value = feedMetrics.get(key);
+ if (value == null) {
+ rVal = UNKNOWN;
+ } else {
+ rVal = value.first / value.second;
+ }
+ return rVal;
+ }
+
+ private List<String> getNodeForSubstitution(int nRequired) {
+ List<String> nodeIds = new ArrayList<String>();
+ Iterator<NodeLoadReport> it = null;
+ int nAdded = 0;
+ while (nAdded < nRequired) {
+ it = nodeReports.iterator();
+ while (it.hasNext()) {
+ nodeIds.add(it.next().getNodeId());
+ nAdded++;
+ }
+ }
+ return nodeIds;
+ }
+
+ @Override
+ public synchronized List<String> getNodes(int required) {
+ Iterator<NodeLoadReport> it;
+ List<String> allocated = new ArrayList<String>();
+ while (allocated.size() < required) {
+ it = nodeReports.iterator();
+ while (it.hasNext() && allocated.size() < required) {
+ allocated.add(it.next().getNodeId());
+ }
+ }
+ return allocated;
+ }
+
+ @Override
+ public void reportThrottlingEnabled(ThrottlingEnabledFeedMessage mesg) throws AsterixException, Exception {
+ System.out.println("Throttling Enabled for " + mesg.getConnectionId() + " " + mesg.getFeedRuntimeId());
+ FeedConnectionId connectionId = mesg.getConnectionId();
+ List<String> destinationLocations = new ArrayList<String>();
+ List<String> storageLocations = FeedLifecycleListener.INSTANCE.getStoreLocations(connectionId);
+ List<String> collectLocations = FeedLifecycleListener.INSTANCE.getCollectLocations(connectionId);
+
+ destinationLocations.addAll(storageLocations);
+ destinationLocations.addAll(collectLocations);
+ JobSpecification messageJobSpec = FeedOperations.buildNotifyThrottlingEnabledMessageJob(mesg,
+ destinationLocations);
+ runJob(messageJobSpec, true);
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.warning("Acking disabled for " + mesg.getConnectionId() + " in view of activated throttling");
+ }
+ IFeedTrackingManager trackingManager = CentralFeedManager.getInstance().getFeedTrackingManager();
+ trackingManager.disableAcking(connectionId);
+ }
+
+ @Override
+ public void reportFeedActivity(FeedConnectionId connectionId, FeedActivity activity) {
+ feedActivities.put(connectionId, activity);
+ }
+
+ @Override
+ public FeedActivity getFeedActivity(FeedConnectionId connectionId) {
+ return feedActivities.get(connectionId);
+ }
+
+ @Override
+ public Collection<FeedActivity> getFeedActivities() {
+ return feedActivities.values();
+ }
+
+ @Override
+ public void removeFeedActivity(FeedConnectionId connectionId) {
+ feedActivities.remove(connectionId);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-app/src/main/java/edu/uci/ics/asterix/feeds/FeedManager.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/feeds/FeedManager.java b/asterix-app/src/main/java/edu/uci/ics/asterix/feeds/FeedManager.java
new file mode 100644
index 0000000..907ea2a
--- /dev/null
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/feeds/FeedManager.java
@@ -0,0 +1,140 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.feeds;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.asterix.common.config.AsterixFeedProperties;
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.common.feeds.FeedMemoryManager;
+import edu.uci.ics.asterix.common.feeds.FeedMessageService;
+import edu.uci.ics.asterix.common.feeds.FeedMetricCollector;
+import edu.uci.ics.asterix.common.feeds.NodeLoadReportService;
+import edu.uci.ics.asterix.common.feeds.api.IFeedConnectionManager;
+import edu.uci.ics.asterix.common.feeds.api.IFeedManager;
+import edu.uci.ics.asterix.common.feeds.api.IFeedMemoryManager;
+import edu.uci.ics.asterix.common.feeds.api.IFeedMessageService;
+import edu.uci.ics.asterix.common.feeds.api.IFeedMetadataManager;
+import edu.uci.ics.asterix.common.feeds.api.IFeedMetricCollector;
+import edu.uci.ics.asterix.common.feeds.api.IFeedSubscriptionManager;
+import edu.uci.ics.asterix.metadata.feeds.FeedConnectionManager;
+import edu.uci.ics.asterix.metadata.feeds.FeedSubscriptionManager;
+import edu.uci.ics.asterix.om.util.AsterixClusterProperties;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * An implementation of the IFeedManager interface.
+ * Provider necessary central repository for registering/retrieving
+ * artifacts/services associated with a feed.
+ */
+public class FeedManager implements IFeedManager {
+
+ private static final Logger LOGGER = Logger.getLogger(FeedManager.class.getName());
+
+ private final IFeedSubscriptionManager feedSubscriptionManager;
+
+ private final IFeedConnectionManager feedConnectionManager;
+
+ private final IFeedMemoryManager feedMemoryManager;
+
+ private final IFeedMetricCollector feedMetricCollector;
+
+ private final IFeedMetadataManager feedMetadataManager;
+
+ private final IFeedMessageService feedMessageService;
+
+ private final NodeLoadReportService nodeLoadReportService;
+
+ private final AsterixFeedProperties asterixFeedProperties;
+
+ private final String nodeId;
+
+ private final int frameSize;
+
+ public FeedManager(String nodeId, AsterixFeedProperties feedProperties, int frameSize) throws AsterixException, HyracksDataException {
+ this.nodeId = nodeId;
+ this.feedSubscriptionManager = new FeedSubscriptionManager(nodeId);
+ this.feedConnectionManager = new FeedConnectionManager(nodeId);
+ this.feedMetadataManager = new FeedMetadataManager(nodeId);
+ this.feedMemoryManager = new FeedMemoryManager(nodeId, feedProperties, frameSize);
+ String ccClusterIp = AsterixClusterProperties.INSTANCE.getCluster() != null ? AsterixClusterProperties.INSTANCE
+ .getCluster().getMasterNode().getClusterIp() : "localhost";
+ this.feedMessageService = new FeedMessageService(feedProperties, nodeId, ccClusterIp);
+ this.nodeLoadReportService = new NodeLoadReportService(nodeId, this);
+ try {
+ this.feedMessageService.start();
+ this.nodeLoadReportService.start();
+ } catch (Exception e) {
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Unable to start feed services " + e.getMessage());
+ }
+ e.printStackTrace();
+ }
+ this.feedMetricCollector = new FeedMetricCollector(nodeId);
+ this.frameSize = frameSize;
+ this.asterixFeedProperties = feedProperties;
+ }
+
+ @Override
+ public IFeedSubscriptionManager getFeedSubscriptionManager() {
+ return feedSubscriptionManager;
+ }
+
+ @Override
+ public IFeedConnectionManager getFeedConnectionManager() {
+ return feedConnectionManager;
+ }
+
+ @Override
+ public IFeedMemoryManager getFeedMemoryManager() {
+ return feedMemoryManager;
+ }
+
+ @Override
+ public IFeedMetricCollector getFeedMetricCollector() {
+ return feedMetricCollector;
+ }
+
+ public int getFrameSize() {
+ return frameSize;
+ }
+
+ @Override
+ public IFeedMetadataManager getFeedMetadataManager() {
+ return feedMetadataManager;
+ }
+
+ @Override
+ public IFeedMessageService getFeedMessageService() {
+ return feedMessageService;
+ }
+
+ @Override
+ public String getNodeId() {
+ return nodeId;
+ }
+
+ @Override
+ public String toString() {
+ return "FeedManager " + "[" + nodeId + "]";
+ }
+
+ @Override
+ public AsterixFeedProperties getAsterixFeedProperties() {
+ return asterixFeedProperties;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-app/src/main/java/edu/uci/ics/asterix/feeds/FeedMessageReceiver.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/feeds/FeedMessageReceiver.java b/asterix-app/src/main/java/edu/uci/ics/asterix/feeds/FeedMessageReceiver.java
new file mode 100644
index 0000000..e16633f
--- /dev/null
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/feeds/FeedMessageReceiver.java
@@ -0,0 +1,74 @@
+package edu.uci.ics.asterix.feeds;
+
+import java.util.logging.Level;
+
+import org.json.JSONObject;
+
+import edu.uci.ics.asterix.common.feeds.FeedConstants;
+import edu.uci.ics.asterix.common.feeds.FeedTupleCommitAckMessage;
+import edu.uci.ics.asterix.common.feeds.MessageReceiver;
+import edu.uci.ics.asterix.common.feeds.NodeLoadReport;
+import edu.uci.ics.asterix.common.feeds.api.IFeedLoadManager;
+import edu.uci.ics.asterix.common.feeds.api.IFeedMessage.MessageType;
+import edu.uci.ics.asterix.common.feeds.api.IFeedTrackingManager;
+import edu.uci.ics.asterix.common.feeds.message.FeedCongestionMessage;
+import edu.uci.ics.asterix.common.feeds.message.FeedReportMessage;
+import edu.uci.ics.asterix.common.feeds.message.ScaleInReportMessage;
+import edu.uci.ics.asterix.common.feeds.message.StorageReportFeedMessage;
+import edu.uci.ics.asterix.common.feeds.message.ThrottlingEnabledFeedMessage;
+import edu.uci.ics.asterix.feeds.CentralFeedManager.AQLExecutor;
+import edu.uci.ics.asterix.hyracks.bootstrap.FeedBootstrap;
+
+public class FeedMessageReceiver extends MessageReceiver<String> {
+
+ private static boolean initialized;
+
+ private final IFeedLoadManager feedLoadManager;
+ private final IFeedTrackingManager feedTrackingManager;
+
+ public FeedMessageReceiver(CentralFeedManager centralFeedManager) {
+ this.feedLoadManager = centralFeedManager.getFeedLoadManager();
+ this.feedTrackingManager = centralFeedManager.getFeedTrackingManager();
+ }
+
+ @Override
+ public void processMessage(String message) throws Exception {
+ JSONObject obj = new JSONObject(message);
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Received message " + obj);
+ }
+ MessageType messageType = MessageType.valueOf(obj.getString(FeedConstants.MessageConstants.MESSAGE_TYPE));
+ switch (messageType) {
+ case XAQL:
+ if (!initialized) {
+ FeedBootstrap.setUpInitialArtifacts();
+ initialized = true;
+ }
+ AQLExecutor.executeAQL(obj.getString(FeedConstants.MessageConstants.AQL));
+ break;
+ case CONGESTION:
+ feedLoadManager.reportCongestion(FeedCongestionMessage.read(obj));
+ break;
+ case FEED_REPORT:
+ feedLoadManager.submitFeedRuntimeReport(FeedReportMessage.read(obj));
+ break;
+ case NODE_REPORT:
+ feedLoadManager.submitNodeLoadReport(NodeLoadReport.read(obj));
+ break;
+ case SCALE_IN_REQUEST:
+ feedLoadManager.submitScaleInPossibleReport(ScaleInReportMessage.read(obj));
+ break;
+ case STORAGE_REPORT:
+ FeedLifecycleListener.INSTANCE.updateTrackingInformation(StorageReportFeedMessage.read(obj));
+ break;
+ case COMMIT_ACK:
+ feedTrackingManager.submitAckReport(FeedTupleCommitAckMessage.read(obj));
+ break;
+ case THROTTLING_ENABLED:
+ feedLoadManager.reportThrottlingEnabled(ThrottlingEnabledFeedMessage.read(obj));
+ default:
+ break;
+ }
+
+ }
+}