You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by am...@apache.org on 2016/02/22 23:35:12 UTC
[24/34] incubator-asterixdb git commit: Enabled Feed Tests and Added
External Library tests
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ac683db0/asterix-app/src/main/java/org/apache/asterix/feed/FeedJobNotificationHandler.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/feed/FeedJobNotificationHandler.java b/asterix-app/src/main/java/org/apache/asterix/feed/FeedJobNotificationHandler.java
deleted file mode 100644
index 49b88ca..0000000
--- a/asterix-app/src/main/java/org/apache/asterix/feed/FeedJobNotificationHandler.java
+++ /dev/null
@@ -1,742 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.feed;
-
-import java.rmi.RemoteException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.asterix.common.exceptions.ACIDException;
-import org.apache.asterix.external.feed.api.IFeedJoint;
-import org.apache.asterix.external.feed.api.IFeedLifecycleEventSubscriber;
-import org.apache.asterix.external.feed.api.IIntakeProgressTracker;
-import org.apache.asterix.external.feed.api.IFeedJoint.State;
-import org.apache.asterix.external.feed.api.IFeedLifecycleEventSubscriber.FeedLifecycleEvent;
-import org.apache.asterix.external.feed.management.FeedConnectionId;
-import org.apache.asterix.external.feed.management.FeedConnectionRequest;
-import org.apache.asterix.external.feed.management.FeedId;
-import org.apache.asterix.external.feed.management.FeedJointKey;
-import org.apache.asterix.external.feed.management.FeedWorkManager;
-import org.apache.asterix.external.feed.message.StorageReportFeedMessage;
-import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
-import org.apache.asterix.external.feed.watch.FeedActivity;
-import org.apache.asterix.external.feed.watch.FeedConnectJobInfo;
-import org.apache.asterix.external.feed.watch.FeedIntakeInfo;
-import org.apache.asterix.external.feed.watch.FeedJobInfo;
-import org.apache.asterix.external.feed.watch.FeedJobInfo.FeedJobState;
-import org.apache.asterix.external.feed.watch.FeedJobInfo.JobType;
-import org.apache.asterix.external.operators.FeedCollectOperatorDescriptor;
-import org.apache.asterix.external.operators.FeedIntakeOperatorDescriptor;
-import org.apache.asterix.external.operators.FeedMetaOperatorDescriptor;
-import org.apache.asterix.feed.FeedLifecycleListener.Message;
-import org.apache.asterix.feed.FeedWorkCollection.SubscribeFeedWork;
-import org.apache.asterix.metadata.feeds.BuiltinFeedPolicies;
-import org.apache.asterix.om.util.AsterixAppContextInfo;
-import org.apache.hyracks.algebricks.common.utils.Pair;
-import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
-import org.apache.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
-import org.apache.hyracks.algebricks.runtime.operators.std.AssignRuntimeFactory;
-import org.apache.hyracks.api.client.IHyracksClientConnection;
-import org.apache.hyracks.api.dataflow.IConnectorDescriptor;
-import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
-import org.apache.hyracks.api.dataflow.OperatorDescriptorId;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.api.job.JobInfo;
-import org.apache.hyracks.api.job.JobSpecification;
-import org.apache.hyracks.api.job.JobStatus;
-import org.apache.hyracks.storage.am.lsm.common.dataflow.LSMTreeIndexInsertUpdateDeleteOperatorDescriptor;
-
-public class FeedJobNotificationHandler implements Runnable {
-
- private static final Logger LOGGER = Logger.getLogger(FeedJobNotificationHandler.class.getName());
-
- private final LinkedBlockingQueue<Message> inbox;
- private final Map<FeedConnectionId, List<IFeedLifecycleEventSubscriber>> eventSubscribers;
-
- private final Map<JobId, FeedJobInfo> jobInfos;
- private final Map<FeedId, FeedIntakeInfo> intakeJobInfos;
- private final Map<FeedConnectionId, FeedConnectJobInfo> connectJobInfos;
- private final Map<FeedId, List<IFeedJoint>> feedPipeline;
- private final Map<FeedConnectionId, Pair<IIntakeProgressTracker, Long>> feedIntakeProgressTrackers;
-
- public FeedJobNotificationHandler(LinkedBlockingQueue<Message> inbox) {
- this.inbox = inbox;
- this.jobInfos = new HashMap<JobId, FeedJobInfo>();
- this.intakeJobInfos = new HashMap<FeedId, FeedIntakeInfo>();
- this.connectJobInfos = new HashMap<FeedConnectionId, FeedConnectJobInfo>();
- this.feedPipeline = new HashMap<FeedId, List<IFeedJoint>>();
- this.eventSubscribers = new HashMap<FeedConnectionId, List<IFeedLifecycleEventSubscriber>>();
- this.feedIntakeProgressTrackers = new HashMap<FeedConnectionId, Pair<IIntakeProgressTracker, Long>>();
- }
-
- @Override
- public void run() {
- Message mesg;
- while (true) {
- try {
- mesg = inbox.take();
- switch (mesg.messageKind) {
- case JOB_START:
- handleJobStartMessage(mesg);
- break;
- case JOB_FINISH:
- handleJobFinishMessage(mesg);
- break;
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
-
- }
- }
-
- public void registerFeedIntakeProgressTracker(FeedConnectionId connectionId,
- IIntakeProgressTracker feedIntakeProgressTracker) {
- if (feedIntakeProgressTrackers.get(connectionId) == null) {
- this.feedIntakeProgressTrackers.put(connectionId, new Pair<IIntakeProgressTracker, Long>(
- feedIntakeProgressTracker, 0L));
- } else {
- throw new IllegalStateException(" Progress tracker for connection " + connectionId
- + " is alreader registered");
- }
- }
-
- public void deregisterFeedIntakeProgressTracker(FeedConnectionId connectionId) {
- this.feedIntakeProgressTrackers.remove(connectionId);
- }
-
- public void updateTrackingInformation(StorageReportFeedMessage srm) {
- Pair<IIntakeProgressTracker, Long> p = feedIntakeProgressTrackers.get(srm.getConnectionId());
- if (p != null && p.second < srm.getLastPersistedTupleIntakeTimestamp()) {
- p.second = srm.getLastPersistedTupleIntakeTimestamp();
- p.first.notifyIngestedTupleTimestamp(p.second);
- }
- }
-
- public Collection<FeedIntakeInfo> getFeedIntakeInfos() {
- return intakeJobInfos.values();
- }
-
- public Collection<FeedConnectJobInfo> getFeedConnectInfos() {
- return connectJobInfos.values();
- }
-
- public void registerFeedJoint(IFeedJoint feedJoint) {
- List<IFeedJoint> feedJointsOnPipeline = feedPipeline.get(feedJoint.getOwnerFeedId());
- if (feedJointsOnPipeline == null) {
- feedJointsOnPipeline = new ArrayList<IFeedJoint>();
- feedPipeline.put(feedJoint.getOwnerFeedId(), feedJointsOnPipeline);
- feedJointsOnPipeline.add(feedJoint);
- } else {
- if (!feedJointsOnPipeline.contains(feedJoint)) {
- feedJointsOnPipeline.add(feedJoint);
- } else {
- throw new IllegalArgumentException("Feed joint " + feedJoint + " already registered");
- }
- }
- }
-
- public void registerFeedIntakeJob(FeedId feedId, JobId jobId, JobSpecification jobSpec) throws HyracksDataException {
- if (jobInfos.get(jobId) != null) {
- throw new IllegalStateException("Feed job already registered");
- }
-
- List<IFeedJoint> joints = feedPipeline.get(feedId);
- IFeedJoint intakeJoint = null;
- for (IFeedJoint joint : joints) {
- if (joint.getType().equals(IFeedJoint.FeedJointType.INTAKE)) {
- intakeJoint = joint;
- break;
- }
- }
-
- if (intakeJoint != null) {
- FeedIntakeInfo intakeJobInfo = new FeedIntakeInfo(jobId, FeedJobState.CREATED, FeedJobInfo.JobType.INTAKE,
- feedId, intakeJoint, jobSpec);
- intakeJobInfos.put(feedId, intakeJobInfo);
- jobInfos.put(jobId, intakeJobInfo);
-
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Registered feed intake [" + jobId + "]" + " for feed " + feedId);
- }
- } else {
- throw new HyracksDataException("Could not register feed intake job [" + jobId + "]" + " for feed "
- + feedId);
- }
- }
-
- public void registerFeedCollectionJob(FeedId sourceFeedId, FeedConnectionId connectionId, JobId jobId,
- JobSpecification jobSpec, Map<String, String> feedPolicy) {
- if (jobInfos.get(jobId) != null) {
- throw new IllegalStateException("Feed job already registered");
- }
-
- List<IFeedJoint> feedJoints = feedPipeline.get(sourceFeedId);
- FeedConnectionId cid = null;
- IFeedJoint sourceFeedJoint = null;
- for (IFeedJoint joint : feedJoints) {
- cid = joint.getReceiver(connectionId);
- if (cid != null) {
- sourceFeedJoint = joint;
- break;
- }
- }
-
- if (cid != null) {
- FeedConnectJobInfo cInfo = new FeedConnectJobInfo(jobId, FeedJobState.CREATED, connectionId,
- sourceFeedJoint, null, jobSpec, feedPolicy);
- jobInfos.put(jobId, cInfo);
- connectJobInfos.put(connectionId, cInfo);
-
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Registered feed connection [" + jobId + "]" + " for feed " + connectionId);
- }
- } else {
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning("Could not register feed collection job [" + jobId + "]" + " for feed connection "
- + connectionId);
- }
- }
-
- }
-
- public void deregisterFeedIntakeJob(JobId jobId) {
- if (jobInfos.get(jobId) == null) {
- throw new IllegalStateException(" Feed Intake job not registered ");
- }
-
- FeedIntakeInfo info = (FeedIntakeInfo) jobInfos.get(jobId);
- jobInfos.remove(jobId);
- intakeJobInfos.remove(info.getFeedId());
-
- if (!info.getState().equals(FeedJobState.UNDER_RECOVERY)) {
- List<IFeedJoint> joints = feedPipeline.get(info.getFeedId());
- joints.remove(info.getIntakeFeedJoint());
-
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Deregistered feed intake job [" + jobId + "]");
- }
- } else {
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Not removing feed joint as intake job is in " + FeedJobState.UNDER_RECOVERY + " state.");
- }
- }
-
- }
-
- private void handleJobStartMessage(Message message) throws Exception {
- FeedJobInfo jobInfo = jobInfos.get(message.jobId);
- switch (jobInfo.getJobType()) {
- case INTAKE:
- handleIntakeJobStartMessage((FeedIntakeInfo) jobInfo);
- break;
- case FEED_CONNECT:
- handleCollectJobStartMessage((FeedConnectJobInfo) jobInfo);
- break;
- }
-
- }
-
- private void handleJobFinishMessage(Message message) throws Exception {
- FeedJobInfo jobInfo = jobInfos.get(message.jobId);
- switch (jobInfo.getJobType()) {
- case INTAKE:
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Intake Job finished for feed intake " + jobInfo.getJobId());
- }
- handleFeedIntakeJobFinishMessage((FeedIntakeInfo) jobInfo, message);
- break;
- case FEED_CONNECT:
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Collect Job finished for " + (FeedConnectJobInfo) jobInfo);
- }
- handleFeedCollectJobFinishMessage((FeedConnectJobInfo) jobInfo);
- break;
- }
-
- }
-
- private synchronized void handleIntakeJobStartMessage(FeedIntakeInfo intakeJobInfo) throws Exception {
- List<OperatorDescriptorId> intakeOperatorIds = new ArrayList<OperatorDescriptorId>();
- Map<OperatorDescriptorId, IOperatorDescriptor> operators = intakeJobInfo.getSpec().getOperatorMap();
- for (Entry<OperatorDescriptorId, IOperatorDescriptor> entry : operators.entrySet()) {
- IOperatorDescriptor opDesc = entry.getValue();
- if (opDesc instanceof FeedIntakeOperatorDescriptor) {
- intakeOperatorIds.add(opDesc.getOperatorId());
- }
- }
-
- IHyracksClientConnection hcc = AsterixAppContextInfo.getInstance().getHcc();
- JobInfo info = hcc.getJobInfo(intakeJobInfo.getJobId());
- List<String> intakeLocations = new ArrayList<String>();
- for (OperatorDescriptorId intakeOperatorId : intakeOperatorIds) {
- Map<Integer, String> operatorLocations = info.getOperatorLocations().get(intakeOperatorId);
- int nOperatorInstances = operatorLocations.size();
- for (int i = 0; i < nOperatorInstances; i++) {
- intakeLocations.add(operatorLocations.get(i));
- }
- }
- // intakeLocations is an ordered list; element at position i corresponds to location of i'th instance of operator
- intakeJobInfo.setIntakeLocation(intakeLocations);
- intakeJobInfo.getIntakeFeedJoint().setState(State.ACTIVE);
- intakeJobInfo.setState(FeedJobState.ACTIVE);
-
- // notify event listeners
- notifyFeedEventSubscribers(intakeJobInfo, FeedLifecycleEvent.FEED_INTAKE_STARTED);
- }
-
- private void handleCollectJobStartMessage(FeedConnectJobInfo cInfo) throws RemoteException, ACIDException {
- // set locations of feed sub-operations (intake, compute, store)
- setLocations(cInfo);
-
- // activate joints
- List<IFeedJoint> joints = feedPipeline.get(cInfo.getConnectionId().getFeedId());
- for (IFeedJoint joint : joints) {
- if (joint.getProvider().equals(cInfo.getConnectionId())) {
- joint.setState(State.ACTIVE);
- if (joint.getType().equals(IFeedJoint.FeedJointType.COMPUTE)) {
- cInfo.setComputeFeedJoint(joint);
- }
- }
- }
- cInfo.setState(FeedJobState.ACTIVE);
-
- // register activity in metadata
- registerFeedActivity(cInfo);
- // notify event listeners
- notifyFeedEventSubscribers(cInfo, FeedLifecycleEvent.FEED_COLLECT_STARTED);
- }
-
- private void notifyFeedEventSubscribers(FeedJobInfo jobInfo, FeedLifecycleEvent event) {
- JobType jobType = jobInfo.getJobType();
- List<FeedConnectionId> impactedConnections = new ArrayList<FeedConnectionId>();
- if (jobType.equals(JobType.INTAKE)) {
- FeedId feedId = ((FeedIntakeInfo) jobInfo).getFeedId();
- for (FeedConnectionId connId : eventSubscribers.keySet()) {
- if (connId.getFeedId().equals(feedId)) {
- impactedConnections.add(connId);
- }
- }
- } else {
- impactedConnections.add(((FeedConnectJobInfo) jobInfo).getConnectionId());
- }
-
- for (FeedConnectionId connId : impactedConnections) {
- List<IFeedLifecycleEventSubscriber> subscribers = eventSubscribers.get(connId);
- if (subscribers != null && !subscribers.isEmpty()) {
- for (IFeedLifecycleEventSubscriber subscriber : subscribers) {
- subscriber.handleFeedEvent(event);
- }
- }
- }
- }
-
- public synchronized void submitFeedConnectionRequest(IFeedJoint feedJoint, final FeedConnectionRequest request)
- throws Exception {
- List<String> locations = null;
- switch (feedJoint.getType()) {
- case INTAKE:
- FeedIntakeInfo intakeInfo = intakeJobInfos.get(feedJoint.getOwnerFeedId());
- locations = intakeInfo.getIntakeLocation();
- break;
- case COMPUTE:
- FeedConnectionId connectionId = feedJoint.getProvider();
- FeedConnectJobInfo cInfo = connectJobInfos.get(connectionId);
- locations = cInfo.getComputeLocations();
- break;
- }
-
- SubscribeFeedWork work = new SubscribeFeedWork(locations.toArray(new String[] {}), request);
- FeedWorkManager.INSTANCE.submitWork(work, new SubscribeFeedWork.FeedSubscribeWorkEventListener());
- }
-
- public IFeedJoint getSourceFeedJoint(FeedConnectionId connectionId) {
- FeedConnectJobInfo cInfo = connectJobInfos.get(connectionId);
- if (cInfo != null) {
- return cInfo.getSourceFeedJoint();
- }
- return null;
- }
-
- public Set<FeedConnectionId> getActiveFeedConnections() {
- Set<FeedConnectionId> activeConnections = new HashSet<FeedConnectionId>();
- for (FeedConnectJobInfo cInfo : connectJobInfos.values()) {
- if (cInfo.getState().equals(FeedJobState.ACTIVE)) {
- activeConnections.add(cInfo.getConnectionId());
- }
- }
- return activeConnections;
- }
-
- public boolean isFeedConnectionActive(FeedConnectionId connectionId) {
- FeedConnectJobInfo cInfo = connectJobInfos.get(connectionId);
- if (cInfo != null) {
- return cInfo.getState().equals(FeedJobState.ACTIVE);
- }
- return false;
- }
-
- public void setJobState(FeedConnectionId connectionId, FeedJobState jobState) {
- FeedConnectJobInfo connectJobInfo = connectJobInfos.get(connectionId);
- connectJobInfo.setState(jobState);
- }
-
- public FeedJobState getFeedJobState(FeedConnectionId connectionId) {
- return connectJobInfos.get(connectionId).getState();
- }
-
- private void handleFeedIntakeJobFinishMessage(FeedIntakeInfo intakeInfo, Message message) throws Exception {
- IHyracksClientConnection hcc = AsterixAppContextInfo.getInstance().getHcc();
- JobInfo info = hcc.getJobInfo(message.jobId);
- JobStatus status = info.getStatus();
- FeedLifecycleEvent event;
- event = status.equals(JobStatus.FAILURE) ? FeedLifecycleEvent.FEED_INTAKE_FAILURE
- : FeedLifecycleEvent.FEED_ENDED;
-
- // remove feed joints
- deregisterFeedIntakeJob(message.jobId);
-
- // notify event listeners
- notifyFeedEventSubscribers(intakeInfo, event);
-
- }
-
- private void handleFeedCollectJobFinishMessage(FeedConnectJobInfo cInfo) throws Exception {
- FeedConnectionId connectionId = cInfo.getConnectionId();
-
- IHyracksClientConnection hcc = AsterixAppContextInfo.getInstance().getHcc();
- JobInfo info = hcc.getJobInfo(cInfo.getJobId());
- JobStatus status = info.getStatus();
- boolean failure = status != null && status.equals(JobStatus.FAILURE);
- FeedPolicyAccessor fpa = new FeedPolicyAccessor(cInfo.getFeedPolicy());
-
- boolean removeJobHistory = !failure;
- boolean retainSubsription = cInfo.getState().equals(FeedJobState.UNDER_RECOVERY)
- || (failure && fpa.continueOnHardwareFailure());
-
- if (!retainSubsription) {
- IFeedJoint feedJoint = cInfo.getSourceFeedJoint();
- feedJoint.removeReceiver(connectionId);
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Subscription " + cInfo.getConnectionId() + " completed successfully. Removed subscription");
- }
- removeFeedJointsPostPipelineTermination(cInfo.getConnectionId());
- }
-
- if (removeJobHistory) {
- connectJobInfos.remove(connectionId);
- jobInfos.remove(cInfo.getJobId());
- feedIntakeProgressTrackers.remove(cInfo.getConnectionId());
- }
- deregisterFeedActivity(cInfo);
-
- // notify event listeners
- FeedLifecycleEvent event = failure ? FeedLifecycleEvent.FEED_COLLECT_FAILURE : FeedLifecycleEvent.FEED_ENDED;
- notifyFeedEventSubscribers(cInfo, event);
- }
-
- private void registerFeedActivity(FeedConnectJobInfo cInfo) {
- Map<String, String> feedActivityDetails = new HashMap<String, String>();
-
- if (cInfo.getCollectLocations() != null) {
- feedActivityDetails.put(FeedActivity.FeedActivityDetails.INTAKE_LOCATIONS,
- StringUtils.join(cInfo.getCollectLocations().iterator(), ','));
- }
-
- if (cInfo.getComputeLocations() != null) {
- feedActivityDetails.put(FeedActivity.FeedActivityDetails.COMPUTE_LOCATIONS,
- StringUtils.join(cInfo.getComputeLocations().iterator(), ','));
- }
-
- if (cInfo.getStorageLocations() != null) {
- feedActivityDetails.put(FeedActivity.FeedActivityDetails.STORAGE_LOCATIONS,
- StringUtils.join(cInfo.getStorageLocations().iterator(), ','));
- }
-
- String policyName = cInfo.getFeedPolicy().get(BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY);
- feedActivityDetails.put(FeedActivity.FeedActivityDetails.FEED_POLICY_NAME, policyName);
-
- feedActivityDetails.put(FeedActivity.FeedActivityDetails.FEED_CONNECT_TIMESTAMP, (new Date()).toString());
- try {
- FeedActivity feedActivity = new FeedActivity(cInfo.getConnectionId().getFeedId().getDataverse(), cInfo
- .getConnectionId().getFeedId().getFeedName(), cInfo.getConnectionId().getDatasetName(),
- feedActivityDetails);
- CentralFeedManager.getInstance().getFeedLoadManager()
- .reportFeedActivity(cInfo.getConnectionId(), feedActivity);
-
- } catch (Exception e) {
- e.printStackTrace();
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning("Unable to register feed activity for " + cInfo + " " + e.getMessage());
- }
-
- }
-
- }
-
- public void deregisterFeedActivity(FeedConnectJobInfo cInfo) {
- try {
- CentralFeedManager.getInstance().getFeedLoadManager().removeFeedActivity(cInfo.getConnectionId());
- } catch (Exception e) {
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning("Unable to deregister feed activity for " + cInfo + " " + e.getMessage());
- }
- }
- }
-
- public void removeFeedJointsPostPipelineTermination(FeedConnectionId connectionId) {
- FeedConnectJobInfo cInfo = connectJobInfos.get(connectionId);
- List<IFeedJoint> feedJoints = feedPipeline.get(connectionId.getFeedId());
-
- IFeedJoint sourceJoint = cInfo.getSourceFeedJoint();
- List<FeedConnectionId> all = sourceJoint.getReceivers();
- boolean removeSourceJoint = all.size() < 2;
- if (removeSourceJoint) {
- feedJoints.remove(sourceJoint);
- }
-
- IFeedJoint computeJoint = cInfo.getComputeFeedJoint();
- if (computeJoint != null && computeJoint.getReceivers().size() < 2) {
- feedJoints.remove(computeJoint);
- }
- }
-
- public boolean isRegisteredFeedJob(JobId jobId) {
- return jobInfos.get(jobId) != null;
- }
-
- public List<String> getFeedComputeLocations(FeedId feedId) {
- List<IFeedJoint> feedJoints = feedPipeline.get(feedId);
- for (IFeedJoint joint : feedJoints) {
- if (joint.getFeedJointKey().getFeedId().equals(feedId)) {
- return connectJobInfos.get(joint.getProvider()).getComputeLocations();
- }
- }
- return null;
- }
-
- public List<String> getFeedStorageLocations(FeedConnectionId connectionId) {
- return connectJobInfos.get(connectionId).getStorageLocations();
- }
-
- public List<String> getFeedCollectLocations(FeedConnectionId connectionId) {
- return connectJobInfos.get(connectionId).getCollectLocations();
- }
-
- public List<String> getFeedIntakeLocations(FeedId feedId) {
- return intakeJobInfos.get(feedId).getIntakeLocation();
- }
-
- public JobId getFeedCollectJobId(FeedConnectionId connectionId) {
- return connectJobInfos.get(connectionId).getJobId();
- }
-
- public void registerFeedEventSubscriber(FeedConnectionId connectionId, IFeedLifecycleEventSubscriber subscriber) {
- List<IFeedLifecycleEventSubscriber> subscribers = eventSubscribers.get(connectionId);
- if (subscribers == null) {
- subscribers = new ArrayList<IFeedLifecycleEventSubscriber>();
- eventSubscribers.put(connectionId, subscribers);
- }
- subscribers.add(subscriber);
- }
-
- public void deregisterFeedEventSubscriber(FeedConnectionId connectionId, IFeedLifecycleEventSubscriber subscriber) {
- List<IFeedLifecycleEventSubscriber> subscribers = eventSubscribers.get(connectionId);
- if (subscribers != null) {
- subscribers.remove(subscriber);
- }
- }
-
- //============================
-
- public boolean isFeedPointAvailable(FeedJointKey feedJointKey) {
- List<IFeedJoint> joints = feedPipeline.get(feedJointKey.getFeedId());
- if (joints != null && !joints.isEmpty()) {
- for (IFeedJoint joint : joints) {
- if (joint.getFeedJointKey().equals(feedJointKey)) {
- return true;
- }
- }
- }
- return false;
- }
-
- public Collection<IFeedJoint> getFeedIntakeJoints() {
- List<IFeedJoint> intakeFeedPoints = new ArrayList<IFeedJoint>();
- for (FeedIntakeInfo info : intakeJobInfos.values()) {
- intakeFeedPoints.add(info.getIntakeFeedJoint());
- }
- return intakeFeedPoints;
- }
-
- public IFeedJoint getFeedJoint(FeedJointKey feedPointKey) {
- List<IFeedJoint> joints = feedPipeline.get(feedPointKey.getFeedId());
- if (joints != null && !joints.isEmpty()) {
- for (IFeedJoint joint : joints) {
- if (joint.getFeedJointKey().equals(feedPointKey)) {
- return joint;
- }
- }
- }
- return null;
- }
-
- public IFeedJoint getAvailableFeedJoint(FeedJointKey feedJointKey) {
- IFeedJoint feedJoint = getFeedJoint(feedJointKey);
- if (feedJoint != null) {
- return feedJoint;
- } else {
- String jointKeyString = feedJointKey.getStringRep();
- List<IFeedJoint> jointsOnPipeline = feedPipeline.get(feedJointKey.getFeedId());
- IFeedJoint candidateJoint = null;
- if (jointsOnPipeline != null) {
- for (IFeedJoint joint : jointsOnPipeline) {
- if (jointKeyString.contains(joint.getFeedJointKey().getStringRep())) {
- if (candidateJoint == null) {
- candidateJoint = joint;
- } else if (joint.getFeedJointKey().getStringRep()
- .contains(candidateJoint.getFeedJointKey().getStringRep())) { // found feed point is a super set of the earlier find
- candidateJoint = joint;
- }
- }
- }
- }
- return candidateJoint;
- }
- }
-
- public JobSpecification getCollectJobSpecification(FeedConnectionId connectionId) {
- return connectJobInfos.get(connectionId).getSpec();
- }
-
- public IFeedJoint getFeedPoint(FeedId sourceFeedId, IFeedJoint.FeedJointType type) {
- List<IFeedJoint> joints = feedPipeline.get(sourceFeedId);
- for (IFeedJoint joint : joints) {
- if (joint.getType().equals(type)) {
- return joint;
- }
- }
- return null;
- }
-
- public FeedConnectJobInfo getFeedConnectJobInfo(FeedConnectionId connectionId) {
- return connectJobInfos.get(connectionId);
- }
-
- private void setLocations(FeedConnectJobInfo cInfo) {
- JobSpecification jobSpec = cInfo.getSpec();
-
- List<OperatorDescriptorId> collectOperatorIds = new ArrayList<OperatorDescriptorId>();
- List<OperatorDescriptorId> computeOperatorIds = new ArrayList<OperatorDescriptorId>();
- List<OperatorDescriptorId> storageOperatorIds = new ArrayList<OperatorDescriptorId>();
-
- Map<OperatorDescriptorId, IOperatorDescriptor> operators = jobSpec.getOperatorMap();
- for (Entry<OperatorDescriptorId, IOperatorDescriptor> entry : operators.entrySet()) {
- IOperatorDescriptor opDesc = entry.getValue();
- IOperatorDescriptor actualOp = null;
- if (opDesc instanceof FeedMetaOperatorDescriptor) {
- actualOp = ((FeedMetaOperatorDescriptor) opDesc).getCoreOperator();
- } else {
- actualOp = opDesc;
- }
-
- if (actualOp instanceof AlgebricksMetaOperatorDescriptor) {
- AlgebricksMetaOperatorDescriptor op = ((AlgebricksMetaOperatorDescriptor) actualOp);
- IPushRuntimeFactory[] runtimeFactories = op.getPipeline().getRuntimeFactories();
- boolean computeOp = false;
- for (IPushRuntimeFactory rf : runtimeFactories) {
- if (rf instanceof AssignRuntimeFactory) {
- IConnectorDescriptor connDesc = jobSpec.getOperatorInputMap().get(op.getOperatorId()).get(0);
- IOperatorDescriptor sourceOp = jobSpec.getConnectorOperatorMap().get(connDesc.getConnectorId())
- .getLeft().getLeft();
- if (sourceOp instanceof FeedCollectOperatorDescriptor) {
- computeOp = true;
- break;
- }
- }
- }
- if (computeOp) {
- computeOperatorIds.add(entry.getKey());
- }
- } else if (actualOp instanceof LSMTreeIndexInsertUpdateDeleteOperatorDescriptor) {
- storageOperatorIds.add(entry.getKey());
- } else if (actualOp instanceof FeedCollectOperatorDescriptor) {
- collectOperatorIds.add(entry.getKey());
- }
- }
-
- try {
- IHyracksClientConnection hcc = AsterixAppContextInfo.getInstance().getHcc();
- JobInfo info = hcc.getJobInfo(cInfo.getJobId());
- List<String> collectLocations = new ArrayList<String>();
- for (OperatorDescriptorId collectOpId : collectOperatorIds) {
- Map<Integer, String> operatorLocations = info.getOperatorLocations().get(collectOpId);
- int nOperatorInstances = operatorLocations.size();
- for (int i = 0; i < nOperatorInstances; i++) {
- collectLocations.add(operatorLocations.get(i));
- }
- }
-
- List<String> computeLocations = new ArrayList<String>();
- for (OperatorDescriptorId computeOpId : computeOperatorIds) {
- Map<Integer, String> operatorLocations = info.getOperatorLocations().get(computeOpId);
- if (operatorLocations != null) {
- int nOperatorInstances = operatorLocations.size();
- for (int i = 0; i < nOperatorInstances; i++) {
- computeLocations.add(operatorLocations.get(i));
- }
- } else {
- computeLocations.clear();
- computeLocations.addAll(collectLocations);
- }
- }
-
- List<String> storageLocations = new ArrayList<String>();
- for (OperatorDescriptorId storageOpId : storageOperatorIds) {
- Map<Integer, String> operatorLocations = info.getOperatorLocations().get(storageOpId);
- if (operatorLocations == null) {
- continue;
- }
- int nOperatorInstances = operatorLocations.size();
- for (int i = 0; i < nOperatorInstances; i++) {
- storageLocations.add(operatorLocations.get(i));
- }
- }
- cInfo.setCollectLocations(collectLocations);
- cInfo.setComputeLocations(computeLocations);
- cInfo.setStorageLocations(storageLocations);
-
- } catch (Exception e) {
- e.printStackTrace();
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ac683db0/asterix-app/src/main/java/org/apache/asterix/feed/FeedJoint.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/feed/FeedJoint.java b/asterix-app/src/main/java/org/apache/asterix/feed/FeedJoint.java
deleted file mode 100644
index 43f227d..0000000
--- a/asterix-app/src/main/java/org/apache/asterix/feed/FeedJoint.java
+++ /dev/null
@@ -1,190 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.feed;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.external.feed.api.IFeedJoint;
-import org.apache.asterix.external.feed.api.IFeedLifecycleListener.ConnectionLocation;
-import org.apache.asterix.external.feed.management.FeedConnectionId;
-import org.apache.asterix.external.feed.management.FeedConnectionRequest;
-import org.apache.asterix.external.feed.management.FeedId;
-import org.apache.asterix.external.feed.management.FeedJointKey;
-
-public class FeedJoint implements IFeedJoint {
-
- private static final Logger LOGGER = Logger.getLogger(FeedJoint.class.getName());
-
- /** A unique key associated with the feed point **/
- private final FeedJointKey key;
-
- /** The state associated with the FeedJoint **/
- private State state;
-
- /** A list of subscribers that receive data from this FeedJoint **/
- private final List<FeedConnectionId> receivers;
-
- /** The feedId on which the feedPoint resides **/
- private final FeedId ownerFeedId;
-
- /** A list of feed subscription requests submitted for subscribing to the FeedPoint's data **/
- private final List<FeedConnectionRequest> connectionRequests;
-
- private final ConnectionLocation connectionLocation;
-
- private final FeedJointType type;
-
- private FeedConnectionId provider;
-
- public FeedJoint(FeedJointKey key, FeedId ownerFeedId, ConnectionLocation subscriptionLocation, FeedJointType type,
- FeedConnectionId provider) {
- this.key = key;
- this.ownerFeedId = ownerFeedId;
- this.type = type;
- this.receivers = new ArrayList<FeedConnectionId>();
- this.state = State.CREATED;
- this.connectionLocation = subscriptionLocation;
- this.connectionRequests = new ArrayList<FeedConnectionRequest>();
- this.provider = provider;
- }
-
- @Override
- public int hashCode() {
- return key.hashCode();
- }
-
- public void addReceiver(FeedConnectionId connectionId) {
- receivers.add(connectionId);
- }
-
- public void removeReceiver(FeedConnectionId connectionId) {
- receivers.remove(connectionId);
- }
-
- public synchronized void addConnectionRequest(FeedConnectionRequest request) {
- connectionRequests.add(request);
- if (state.equals(State.ACTIVE)) {
- handlePendingConnectionRequest();
- }
- }
-
- public synchronized void setState(State state) {
- if (this.state.equals(state)) {
- return;
- }
- this.state = state;
- if (this.state.equals(State.ACTIVE)) {
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Feed joint " + this + " is now " + State.ACTIVE);
- }
- handlePendingConnectionRequest();
- }
- }
-
- private void handlePendingConnectionRequest() {
- for (FeedConnectionRequest connectionRequest : connectionRequests) {
- FeedConnectionId connectionId = new FeedConnectionId(connectionRequest.getReceivingFeedId(),
- connectionRequest.getTargetDataset());
- try {
- FeedLifecycleListener.INSTANCE.submitFeedConnectionRequest(this, connectionRequest);
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Submitted feed connection request " + connectionRequest + " at feed joint " + this);
- }
- addReceiver(connectionId);
- } catch (Exception e) {
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning("Unsuccessful attempt at submitting connection request " + connectionRequest
- + " at feed joint " + this + ". Message " + e.getMessage());
- }
- e.printStackTrace();
- }
- }
- connectionRequests.clear();
- }
-
- public FeedConnectionId getReceiver(FeedConnectionId connectionId) {
- for (FeedConnectionId cid : receivers) {
- if (cid.equals(connectionId)) {
- return cid;
- }
- }
- return null;
- }
-
- @Override
- public String toString() {
- return key.toString() + " [" + connectionLocation + "]" + "[" + state + "]";
- }
-
- @Override
- public boolean equals(Object o) {
- if (o == null) {
- return false;
- }
- if (o == this) {
- return true;
- }
- if (!(o instanceof FeedJoint)) {
- return false;
- }
- return ((FeedJoint) o).getFeedJointKey().equals(this.key);
- }
-
- public FeedId getOwnerFeedId() {
- return ownerFeedId;
- }
-
- public List<FeedConnectionRequest> getConnectionRequests() {
- return connectionRequests;
- }
-
- public ConnectionLocation getConnectionLocation() {
- return connectionLocation;
- }
-
- public FeedJointType getType() {
- return type;
- }
-
- @Override
- public FeedConnectionId getProvider() {
- return provider;
- }
-
- public List<FeedConnectionId> getReceivers() {
- return receivers;
- }
-
- public FeedJointKey getKey() {
- return key;
- }
-
- public synchronized State getState() {
- return state;
- }
-
- @Override
- public FeedJointKey getFeedJointKey() {
- return key;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ac683db0/asterix-app/src/main/java/org/apache/asterix/feed/FeedLifecycleListener.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/feed/FeedLifecycleListener.java b/asterix-app/src/main/java/org/apache/asterix/feed/FeedLifecycleListener.java
deleted file mode 100644
index aac3675..0000000
--- a/asterix-app/src/main/java/org/apache/asterix/feed/FeedLifecycleListener.java
+++ /dev/null
@@ -1,499 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.feed;
-
-import java.io.PrintWriter;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.api.common.SessionConfig;
-import org.apache.asterix.api.common.SessionConfig.OutputFormat;
-import org.apache.asterix.aql.translator.QueryTranslator;
-import org.apache.asterix.common.api.IClusterManagementWork;
-import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
-import org.apache.asterix.common.api.IClusterManagementWorkResponse;
-import org.apache.asterix.compiler.provider.AqlCompilationProvider;
-import org.apache.asterix.compiler.provider.ILangCompilationProvider;
-import org.apache.asterix.external.feed.api.IFeedJoint;
-import org.apache.asterix.external.feed.api.IFeedLifecycleEventSubscriber;
-import org.apache.asterix.external.feed.api.IFeedLifecycleListener;
-import org.apache.asterix.external.feed.api.IIntakeProgressTracker;
-import org.apache.asterix.external.feed.management.FeedCollectInfo;
-import org.apache.asterix.external.feed.management.FeedConnectionId;
-import org.apache.asterix.external.feed.management.FeedConnectionRequest;
-import org.apache.asterix.external.feed.management.FeedId;
-import org.apache.asterix.external.feed.management.FeedJointKey;
-import org.apache.asterix.external.feed.message.StorageReportFeedMessage;
-import org.apache.asterix.external.feed.watch.FeedConnectJobInfo;
-import org.apache.asterix.external.feed.watch.FeedIntakeInfo;
-import org.apache.asterix.external.feed.watch.FeedJobInfo;
-import org.apache.asterix.external.feed.watch.FeedJobInfo.FeedJobState;
-import org.apache.asterix.external.operators.FeedCollectOperatorDescriptor;
-import org.apache.asterix.external.operators.FeedIntakeOperatorDescriptor;
-import org.apache.asterix.lang.common.base.Statement;
-import org.apache.asterix.lang.common.statement.DataverseDecl;
-import org.apache.asterix.lang.common.statement.DisconnectFeedStatement;
-import org.apache.asterix.lang.common.struct.Identifier;
-import org.apache.asterix.metadata.MetadataManager;
-import org.apache.asterix.metadata.MetadataTransactionContext;
-import org.apache.asterix.metadata.cluster.AddNodeWork;
-import org.apache.asterix.metadata.cluster.ClusterManager;
-import org.apache.asterix.om.util.AsterixAppContextInfo;
-import org.apache.asterix.om.util.AsterixClusterProperties;
-import org.apache.hyracks.algebricks.common.utils.Pair;
-import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
-import org.apache.hyracks.api.exceptions.HyracksException;
-import org.apache.hyracks.api.job.IActivityClusterGraphGeneratorFactory;
-import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.api.job.JobSpecification;
-
-/**
- * A listener that subscribes to events associated with cluster membership
- * (nodes joining/leaving the cluster) and job lifecycle (start/end of a job).
- * Subscription to such events allows keeping track of feed ingestion jobs and
- * take any corrective action that may be required when a node involved in a
- * feed leaves the cluster.
- */
-public class FeedLifecycleListener implements IFeedLifecycleListener {
-
- private static final Logger LOGGER = Logger.getLogger(FeedLifecycleListener.class.getName());
-
- public static FeedLifecycleListener INSTANCE = new FeedLifecycleListener();
- private static final ILangCompilationProvider compilationProvider = new AqlCompilationProvider();
-
- private final LinkedBlockingQueue<Message> jobEventInbox;
- private final LinkedBlockingQueue<IClusterManagementWorkResponse> responseInbox;
- private final Map<FeedCollectInfo, List<String>> dependentFeeds = new HashMap<FeedCollectInfo, List<String>>();
- private final Map<FeedConnectionId, LinkedBlockingQueue<String>> feedReportQueue;
- private final FeedJobNotificationHandler feedJobNotificationHandler;
- private final FeedWorkRequestResponseHandler feedWorkRequestResponseHandler;
- private final ExecutorService executorService;
-
- private ClusterState state;
-
- private FeedLifecycleListener() {
- this.jobEventInbox = new LinkedBlockingQueue<Message>();
- this.feedJobNotificationHandler = new FeedJobNotificationHandler(jobEventInbox);
- this.responseInbox = new LinkedBlockingQueue<IClusterManagementWorkResponse>();
- this.feedWorkRequestResponseHandler = new FeedWorkRequestResponseHandler(responseInbox);
- this.feedReportQueue = new HashMap<FeedConnectionId, LinkedBlockingQueue<String>>();
- this.executorService = Executors.newCachedThreadPool();
- this.executorService.execute(feedJobNotificationHandler);
- this.executorService.execute(feedWorkRequestResponseHandler);
- ClusterManager.INSTANCE.registerSubscriber(this);
- this.state = AsterixClusterProperties.INSTANCE.getState();
- }
-
- @Override
- public void notifyJobStart(JobId jobId) throws HyracksException {
- if (feedJobNotificationHandler.isRegisteredFeedJob(jobId)) {
- jobEventInbox.add(new Message(jobId, Message.MessageKind.JOB_START));
- }
- }
-
- @Override
- public void notifyJobFinish(JobId jobId) throws HyracksException {
- if (feedJobNotificationHandler.isRegisteredFeedJob(jobId)) {
- jobEventInbox.add(new Message(jobId, Message.MessageKind.JOB_FINISH));
- } else {
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("NO NEED TO NOTIFY JOB FINISH!");
- }
- }
- }
-
- public FeedConnectJobInfo getFeedConnectJobInfo(FeedConnectionId connectionId) {
- return feedJobNotificationHandler.getFeedConnectJobInfo(connectionId);
- }
-
- public void registerFeedIntakeProgressTracker(FeedConnectionId connectionId,
- IIntakeProgressTracker feedIntakeProgressTracker) {
- feedJobNotificationHandler.registerFeedIntakeProgressTracker(connectionId, feedIntakeProgressTracker);
- }
-
- public void deregisterFeedIntakeProgressTracker(FeedConnectionId connectionId) {
- feedJobNotificationHandler.deregisterFeedIntakeProgressTracker(connectionId);
- }
-
- public void updateTrackingInformation(StorageReportFeedMessage srm) {
- feedJobNotificationHandler.updateTrackingInformation(srm);
- }
-
- /*
- * Traverse job specification to categorize job as a feed intake job or a feed collection job
- */
- @Override
- public void notifyJobCreation(JobId jobId, IActivityClusterGraphGeneratorFactory acggf) throws HyracksException {
- JobSpecification spec = acggf.getJobSpecification();
- FeedConnectionId feedConnectionId = null;
- Map<String, String> feedPolicy = null;
- for (IOperatorDescriptor opDesc : spec.getOperatorMap().values()) {
- if (opDesc instanceof FeedCollectOperatorDescriptor) {
- feedConnectionId = ((FeedCollectOperatorDescriptor) opDesc).getFeedConnectionId();
- feedPolicy = ((FeedCollectOperatorDescriptor) opDesc).getFeedPolicyProperties();
- feedJobNotificationHandler.registerFeedCollectionJob(
- ((FeedCollectOperatorDescriptor) opDesc).getSourceFeedId(), feedConnectionId, jobId, spec,
- feedPolicy);
- break;
- } else if (opDesc instanceof FeedIntakeOperatorDescriptor) {
- feedJobNotificationHandler.registerFeedIntakeJob(((FeedIntakeOperatorDescriptor) opDesc).getFeedId(),
- jobId, spec);
- break;
- }
- }
- }
-
- public void setJobState(FeedConnectionId connectionId, FeedJobState jobState) {
- feedJobNotificationHandler.setJobState(connectionId, jobState);
- }
-
- public FeedJobState getFeedJobState(FeedConnectionId connectionId) {
- return feedJobNotificationHandler.getFeedJobState(connectionId);
- }
-
- public static class Message {
- public JobId jobId;
-
- public enum MessageKind {
- JOB_START,
- JOB_FINISH
- }
-
- public MessageKind messageKind;
-
- public Message(JobId jobId, MessageKind msgKind) {
- this.jobId = jobId;
- this.messageKind = msgKind;
- }
- }
-
- @Override
- public Set<IClusterManagementWork> notifyNodeFailure(Set<String> deadNodeIds) {
- Set<IClusterManagementWork> workToBeDone = new HashSet<IClusterManagementWork>();
-
- Collection<FeedIntakeInfo> intakeInfos = feedJobNotificationHandler.getFeedIntakeInfos();
- Collection<FeedConnectJobInfo> connectJobInfos = feedJobNotificationHandler.getFeedConnectInfos();
-
- Map<String, List<FeedJobInfo>> impactedJobs = new HashMap<String, List<FeedJobInfo>>();
-
- for (String deadNode : deadNodeIds) {
- for (FeedIntakeInfo intakeInfo : intakeInfos) {
- if (intakeInfo.getIntakeLocation().contains(deadNode)) {
- List<FeedJobInfo> infos = impactedJobs.get(deadNode);
- if (infos == null) {
- infos = new ArrayList<FeedJobInfo>();
- impactedJobs.put(deadNode, infos);
- }
- infos.add(intakeInfo);
- intakeInfo.setState(FeedJobState.UNDER_RECOVERY);
- }
- }
-
- for (FeedConnectJobInfo connectInfo : connectJobInfos) {
- if (connectInfo.getStorageLocations().contains(deadNode)) {
- continue;
- }
- if (connectInfo.getComputeLocations().contains(deadNode)
- || connectInfo.getCollectLocations().contains(deadNode)) {
- List<FeedJobInfo> infos = impactedJobs.get(deadNode);
- if (infos == null) {
- infos = new ArrayList<FeedJobInfo>();
- impactedJobs.put(deadNode, infos);
- }
- infos.add(connectInfo);
- connectInfo.setState(FeedJobState.UNDER_RECOVERY);
- feedJobNotificationHandler.deregisterFeedActivity(connectInfo);
- }
- }
-
- }
-
- if (impactedJobs.size() > 0) {
- AddNodeWork addNodeWork = new AddNodeWork(deadNodeIds, deadNodeIds.size(), this);
- feedWorkRequestResponseHandler.registerFeedWork(addNodeWork.getWorkId(), impactedJobs);
- workToBeDone.add(addNodeWork);
- }
- return workToBeDone;
-
- }
-
- public static class FailureReport {
-
- private final List<Pair<FeedConnectJobInfo, List<String>>> recoverableConnectJobs;
- private final Map<IFeedJoint, List<String>> recoverableIntakeFeedIds;
-
- public FailureReport(Map<IFeedJoint, List<String>> recoverableIntakeFeedIds,
- List<Pair<FeedConnectJobInfo, List<String>>> recoverableSubscribers) {
- this.recoverableConnectJobs = recoverableSubscribers;
- this.recoverableIntakeFeedIds = recoverableIntakeFeedIds;
- }
-
- public List<Pair<FeedConnectJobInfo, List<String>>> getRecoverableSubscribers() {
- return recoverableConnectJobs;
- }
-
- public Map<IFeedJoint, List<String>> getRecoverableIntakeFeedIds() {
- return recoverableIntakeFeedIds;
- }
-
- }
-
- @Override
- public Set<IClusterManagementWork> notifyNodeJoin(String joinedNodeId) {
- ClusterState newState = AsterixClusterProperties.INSTANCE.getState();
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info(joinedNodeId + " joined the cluster. " + "Asterix state: " + newState);
- }
-
- boolean needToReActivateFeeds = !newState.equals(state) && (newState == ClusterState.ACTIVE);
- if (needToReActivateFeeds) {
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info(joinedNodeId + " Resuming loser feeds (if any)");
- }
- try {
- FeedsActivator activator = new FeedsActivator();
- (new Thread(activator)).start();
- } catch (Exception e) {
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Exception in resuming feeds" + e.getMessage());
- }
- }
- state = newState;
- } else {
- List<FeedCollectInfo> feedsThatCanBeRevived = new ArrayList<FeedCollectInfo>();
- for (Entry<FeedCollectInfo, List<String>> entry : dependentFeeds.entrySet()) {
- List<String> requiredNodeIds = entry.getValue();
- if (requiredNodeIds.contains(joinedNodeId)) {
- requiredNodeIds.remove(joinedNodeId);
- if (requiredNodeIds.isEmpty()) {
- feedsThatCanBeRevived.add(entry.getKey());
- }
- }
- }
- if (!feedsThatCanBeRevived.isEmpty()) {
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info(joinedNodeId + " Resuming feeds after rejoining of node " + joinedNodeId);
- }
- FeedsActivator activator = new FeedsActivator(feedsThatCanBeRevived);
- (new Thread(activator)).start();
- }
- }
- return null;
- }
-
- @Override
- public void notifyRequestCompletion(IClusterManagementWorkResponse response) {
- try {
- responseInbox.put(response);
- } catch (InterruptedException e) {
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning("Interrupted exception");
- }
- }
- }
-
- @Override
- public void notifyStateChange(ClusterState previousState, ClusterState newState) {
- switch (newState) {
- case ACTIVE:
- if (previousState.equals(ClusterState.UNUSABLE)) {
- try {
- // TODO: Figure out why code was commented
- // FeedsActivator activator = new FeedsActivator();
- // (new Thread(activator)).start();
- } catch (Exception e) {
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Exception in resuming feeds" + e.getMessage());
- }
- }
- }
- break;
- default:
- break;
- }
-
- }
-
- public static class FeedsDeActivator implements Runnable {
-
- private List<FeedConnectJobInfo> failedConnectjobs;
-
- public FeedsDeActivator(List<FeedConnectJobInfo> failedConnectjobs) {
- this.failedConnectjobs = failedConnectjobs;
- }
-
- @Override
- public void run() {
- for (FeedConnectJobInfo failedConnectJob : failedConnectjobs) {
- endFeed(failedConnectJob);
- }
- }
-
- private void endFeed(FeedConnectJobInfo cInfo) {
- MetadataTransactionContext ctx = null;
- PrintWriter writer = new PrintWriter(System.out, true);
- SessionConfig pc = new SessionConfig(writer, OutputFormat.ADM);
-
- try {
- ctx = MetadataManager.INSTANCE.beginTransaction();
- FeedId feedId = cInfo.getConnectionId().getFeedId();
- DisconnectFeedStatement stmt = new DisconnectFeedStatement(new Identifier(feedId.getDataverse()),
- new Identifier(feedId.getFeedName()), new Identifier(cInfo.getConnectionId().getDatasetName()));
- List<Statement> statements = new ArrayList<Statement>();
- DataverseDecl dataverseDecl = new DataverseDecl(new Identifier(feedId.getDataverse()));
- statements.add(dataverseDecl);
- statements.add(stmt);
- QueryTranslator translator = new QueryTranslator(statements, pc, compilationProvider);
- translator.compileAndExecute(AsterixAppContextInfo.getInstance().getHcc(), null,
- QueryTranslator.ResultDelivery.SYNC);
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("End irrecoverable feed: " + cInfo.getConnectionId());
- }
- MetadataManager.INSTANCE.commitTransaction(ctx);
- } catch (Exception e) {
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Exception in ending loser feed: " + cInfo.getConnectionId() + " Exception "
- + e.getMessage());
- }
- e.printStackTrace();
- try {
- MetadataManager.INSTANCE.abortTransaction(ctx);
- } catch (Exception e2) {
- e2.addSuppressed(e);
- if (LOGGER.isLoggable(Level.SEVERE)) {
- LOGGER.severe("Exception in aborting transaction! System is in inconsistent state");
- }
- }
-
- }
-
- }
- }
-
- public void submitFeedConnectionRequest(IFeedJoint feedPoint, FeedConnectionRequest subscriptionRequest)
- throws Exception {
- feedJobNotificationHandler.submitFeedConnectionRequest(feedPoint, subscriptionRequest);
- }
-
- @Override
- public List<FeedConnectionId> getActiveFeedConnections(FeedId feedId) {
- List<FeedConnectionId> connections = new ArrayList<FeedConnectionId>();
- Collection<FeedConnectionId> activeConnections = feedJobNotificationHandler.getActiveFeedConnections();
- if (feedId != null) {
- for (FeedConnectionId connectionId : activeConnections) {
- if (connectionId.getFeedId().equals(feedId)) {
- connections.add(connectionId);
- }
- }
- } else {
- connections.addAll(activeConnections);
- }
- return connections;
- }
-
- @Override
- public List<String> getComputeLocations(FeedId feedId) {
- return feedJobNotificationHandler.getFeedComputeLocations(feedId);
- }
-
- @Override
- public List<String> getIntakeLocations(FeedId feedId) {
- return feedJobNotificationHandler.getFeedIntakeLocations(feedId);
- }
-
- @Override
- public List<String> getStoreLocations(FeedConnectionId feedConnectionId) {
- return feedJobNotificationHandler.getFeedStorageLocations(feedConnectionId);
- }
-
- @Override
- public List<String> getCollectLocations(FeedConnectionId feedConnectionId) {
- return feedJobNotificationHandler.getFeedCollectLocations(feedConnectionId);
- }
-
- @Override
- public boolean isFeedConnectionActive(FeedConnectionId connectionId) {
- return feedJobNotificationHandler.isFeedConnectionActive(connectionId);
- }
-
- public void reportPartialDisconnection(FeedConnectionId connectionId) {
- feedJobNotificationHandler.removeFeedJointsPostPipelineTermination(connectionId);
- }
-
- public void registerFeedReportQueue(FeedConnectionId feedId, LinkedBlockingQueue<String> queue) {
- feedReportQueue.put(feedId, queue);
- }
-
- public void deregisterFeedReportQueue(FeedConnectionId feedId, LinkedBlockingQueue<String> queue) {
- feedReportQueue.remove(feedId);
- }
-
- public LinkedBlockingQueue<String> getFeedReportQueue(FeedConnectionId feedId) {
- return feedReportQueue.get(feedId);
- }
-
- @Override
- public IFeedJoint getAvailableFeedJoint(FeedJointKey feedJointKey) {
- return feedJobNotificationHandler.getAvailableFeedJoint(feedJointKey);
- }
-
- @Override
- public boolean isFeedJointAvailable(FeedJointKey feedJointKey) {
- return feedJobNotificationHandler.isFeedPointAvailable(feedJointKey);
- }
-
- public void registerFeedJoint(IFeedJoint feedJoint) {
- feedJobNotificationHandler.registerFeedJoint(feedJoint);
- }
-
- public IFeedJoint getFeedJoint(FeedJointKey feedJointKey) {
- return feedJobNotificationHandler.getFeedJoint(feedJointKey);
- }
-
- @Override
- public void registerFeedEventSubscriber(FeedConnectionId connectionId, IFeedLifecycleEventSubscriber subscriber) {
- feedJobNotificationHandler.registerFeedEventSubscriber(connectionId, subscriber);
- }
-
- @Override
- public void deregisterFeedEventSubscriber(FeedConnectionId connectionId, IFeedLifecycleEventSubscriber subscriber) {
- feedJobNotificationHandler.deregisterFeedEventSubscriber(connectionId, subscriber);
-
- }
-
- public JobSpecification getCollectJobSpecification(FeedConnectionId connectionId) {
- return feedJobNotificationHandler.getCollectJobSpecification(connectionId);
- }
-
- public JobId getFeedCollectJobId(FeedConnectionId connectionId) {
- return feedJobNotificationHandler.getFeedCollectJobId(connectionId);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ac683db0/asterix-app/src/main/java/org/apache/asterix/feed/FeedLoadManager.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/feed/FeedLoadManager.java b/asterix-app/src/main/java/org/apache/asterix/feed/FeedLoadManager.java
deleted file mode 100644
index 18e885d..0000000
--- a/asterix-app/src/main/java/org/apache/asterix/feed/FeedLoadManager.java
+++ /dev/null
@@ -1,302 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.feed;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeSet;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.external.feed.api.IFeedLoadManager;
-import org.apache.asterix.external.feed.api.IFeedTrackingManager;
-import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
-import org.apache.asterix.external.feed.management.FeedConnectionId;
-import org.apache.asterix.external.feed.message.FeedCongestionMessage;
-import org.apache.asterix.external.feed.message.FeedReportMessage;
-import org.apache.asterix.external.feed.message.PrepareStallMessage;
-import org.apache.asterix.external.feed.message.ScaleInReportMessage;
-import org.apache.asterix.external.feed.message.TerminateDataFlowMessage;
-import org.apache.asterix.external.feed.message.ThrottlingEnabledFeedMessage;
-import org.apache.asterix.external.feed.runtime.FeedRuntimeId;
-import org.apache.asterix.external.feed.watch.FeedActivity;
-import org.apache.asterix.external.feed.watch.NodeLoadReport;
-import org.apache.asterix.external.feed.watch.FeedJobInfo.FeedJobState;
-import org.apache.asterix.file.FeedOperations;
-import org.apache.asterix.metadata.feeds.FeedMetadataUtil;
-import org.apache.asterix.om.util.AsterixAppContextInfo;
-import org.apache.hyracks.algebricks.common.utils.Pair;
-import org.apache.hyracks.api.client.IHyracksClientConnection;
-import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.api.job.JobSpecification;
-
-public class FeedLoadManager implements IFeedLoadManager {
-
- private static final Logger LOGGER = Logger.getLogger(FeedLoadManager.class.getName());
-
- private static final long MIN_MODIFICATION_INTERVAL = 180000; // 10 seconds
- private final TreeSet<NodeLoadReport> nodeReports;
- private final Map<FeedConnectionId, FeedActivity> feedActivities;
- private final Map<String, Pair<Integer, Integer>> feedMetrics;
-
- private FeedConnectionId lastModified;
- private long lastModifiedTimestamp;
-
- private static final int UNKNOWN = -1;
-
- public FeedLoadManager() {
- this.nodeReports = new TreeSet<NodeLoadReport>();
- this.feedActivities = new HashMap<FeedConnectionId, FeedActivity>();
- this.feedMetrics = new HashMap<String, Pair<Integer, Integer>>();
- }
-
- @Override
- public void submitNodeLoadReport(NodeLoadReport report) {
- nodeReports.remove(report);
- nodeReports.add(report);
- }
-
- @Override
- public void reportCongestion(FeedCongestionMessage message) throws AsterixException {
- FeedRuntimeId runtimeId = message.getRuntimeId();
- FeedJobState jobState = FeedLifecycleListener.INSTANCE.getFeedJobState(message.getConnectionId());
- if (jobState == null
- || (jobState.equals(FeedJobState.UNDER_RECOVERY))
- || (message.getConnectionId().equals(lastModified) && System.currentTimeMillis()
- - lastModifiedTimestamp < MIN_MODIFICATION_INTERVAL)) {
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Ignoring congestion report from " + runtimeId);
- }
- return;
- } else {
- try {
- FeedLifecycleListener.INSTANCE.setJobState(message.getConnectionId(), FeedJobState.UNDER_RECOVERY);
- int inflowRate = message.getInflowRate();
- int outflowRate = message.getOutflowRate();
- List<String> currentComputeLocations = new ArrayList<String>();
- currentComputeLocations.addAll(FeedLifecycleListener.INSTANCE.getComputeLocations(message
- .getConnectionId().getFeedId()));
- int computeCardinality = currentComputeLocations.size();
- int requiredCardinality = (int) Math
- .ceil((double) ((computeCardinality * inflowRate) / (double) outflowRate)) + 5;
- int additionalComputeNodes = requiredCardinality - computeCardinality;
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning("INCREASING COMPUTE CARDINALITY from " + computeCardinality + " by "
- + additionalComputeNodes);
- }
-
- List<String> helperComputeNodes = getNodeForSubstitution(additionalComputeNodes);
-
- // Step 1) Alter the original feed job to adjust the cardinality
- JobSpecification jobSpec = FeedLifecycleListener.INSTANCE.getCollectJobSpecification(message
- .getConnectionId());
- helperComputeNodes.addAll(currentComputeLocations);
- List<String> newLocations = new ArrayList<String>();
- newLocations.addAll(currentComputeLocations);
- newLocations.addAll(helperComputeNodes);
- FeedMetadataUtil.increaseCardinality(jobSpec, FeedRuntimeType.COMPUTE, requiredCardinality, newLocations);
-
- // Step 2) send prepare to stall message
- gracefullyTerminateDataFlow(message.getConnectionId(), Integer.MAX_VALUE);
-
- // Step 3) run the altered job specification
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("New Job after adjusting to the workload " + jobSpec);
- }
-
- Thread.sleep(10000);
- runJob(jobSpec, false);
- lastModified = message.getConnectionId();
- lastModifiedTimestamp = System.currentTimeMillis();
-
- } catch (Exception e) {
- e.printStackTrace();
- if (LOGGER.isLoggable(Level.SEVERE)) {
- LOGGER.severe("Unable to form the required job for scaling in/out" + e.getMessage());
- }
- throw new AsterixException(e);
- }
- }
- }
-
- @Override
- public void submitScaleInPossibleReport(ScaleInReportMessage message) throws Exception {
- FeedJobState jobState = FeedLifecycleListener.INSTANCE.getFeedJobState(message.getConnectionId());
- if (jobState == null || (jobState.equals(FeedJobState.UNDER_RECOVERY))) {
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning("JobState information for job " + "[" + message.getConnectionId() + "]" + " not found ");
- }
- return;
- } else {
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Processing scale-in message " + message);
- }
- FeedLifecycleListener.INSTANCE.setJobState(message.getConnectionId(), FeedJobState.UNDER_RECOVERY);
- JobSpecification jobSpec = FeedLifecycleListener.INSTANCE.getCollectJobSpecification(message
- .getConnectionId());
- int reducedCardinality = message.getReducedCardinaliy();
- List<String> currentComputeLocations = new ArrayList<String>();
- currentComputeLocations.addAll(FeedLifecycleListener.INSTANCE.getComputeLocations(message.getConnectionId()
- .getFeedId()));
- FeedMetadataUtil.decreaseComputeCardinality(jobSpec, FeedRuntimeType.COMPUTE, reducedCardinality,
- currentComputeLocations);
-
- gracefullyTerminateDataFlow(message.getConnectionId(), reducedCardinality - 1);
- Thread.sleep(3000);
- JobId newJobId = runJob(jobSpec, false);
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Launch modified job" + "[" + newJobId + "]" + "for scale-in \n" + jobSpec);
- }
-
- }
- }
-
- private void gracefullyTerminateDataFlow(FeedConnectionId connectionId, int computePartitionRetainLimit)
- throws Exception {
- // Step 1) send prepare to stall message
- PrepareStallMessage stallMessage = new PrepareStallMessage(connectionId, computePartitionRetainLimit);
- List<String> intakeLocations = FeedLifecycleListener.INSTANCE.getCollectLocations(connectionId);
- List<String> computeLocations = FeedLifecycleListener.INSTANCE.getComputeLocations(connectionId.getFeedId());
- List<String> storageLocations = FeedLifecycleListener.INSTANCE.getStoreLocations(connectionId);
-
- Set<String> operatorLocations = new HashSet<String>();
-
- operatorLocations.addAll(intakeLocations);
- operatorLocations.addAll(computeLocations);
- operatorLocations.addAll(storageLocations);
-
- JobSpecification messageJobSpec = FeedOperations.buildPrepareStallMessageJob(stallMessage, operatorLocations);
- runJob(messageJobSpec, true);
-
- // Step 2)
- TerminateDataFlowMessage terminateMesg = new TerminateDataFlowMessage(connectionId);
- messageJobSpec = FeedOperations.buildTerminateFlowMessageJob(terminateMesg, intakeLocations);
- runJob(messageJobSpec, true);
- }
-
- public static JobId runJob(JobSpecification spec, boolean waitForCompletion) throws Exception {
- IHyracksClientConnection hcc = AsterixAppContextInfo.getInstance().getHcc();
- JobId jobId = hcc.startJob(spec);
- if (waitForCompletion) {
- hcc.waitForCompletion(jobId);
- }
- return jobId;
- }
-
- @Override
- public void submitFeedRuntimeReport(FeedReportMessage report) {
- String key = "" + report.getConnectionId() + ":" + report.getRuntimeId().getFeedRuntimeType();
- Pair<Integer, Integer> value = feedMetrics.get(key);
- if (value == null) {
- value = new Pair<Integer, Integer>(report.getValue(), 1);
- feedMetrics.put(key, value);
- } else {
- value.first = value.first + report.getValue();
- value.second = value.second + 1;
- }
- }
-
- @Override
- public int getOutflowRate(FeedConnectionId connectionId, FeedRuntimeType runtimeType) {
- int rVal;
- String key = "" + connectionId + ":" + runtimeType;
- feedMetrics.get(key);
- Pair<Integer, Integer> value = feedMetrics.get(key);
- if (value == null) {
- rVal = UNKNOWN;
- } else {
- rVal = value.first / value.second;
- }
- return rVal;
- }
-
- private List<String> getNodeForSubstitution(int nRequired) {
- List<String> nodeIds = new ArrayList<String>();
- Iterator<NodeLoadReport> it = null;
- int nAdded = 0;
- while (nAdded < nRequired) {
- it = nodeReports.iterator();
- while (it.hasNext()) {
- nodeIds.add(it.next().getNodeId());
- nAdded++;
- }
- }
- return nodeIds;
- }
-
- @Override
- public synchronized List<String> getNodes(int required) {
- Iterator<NodeLoadReport> it;
- List<String> allocated = new ArrayList<String>();
- while (allocated.size() < required) {
- it = nodeReports.iterator();
- while (it.hasNext() && allocated.size() < required) {
- allocated.add(it.next().getNodeId());
- }
- }
- return allocated;
- }
-
- @Override
- public void reportThrottlingEnabled(ThrottlingEnabledFeedMessage mesg) throws AsterixException, Exception {
- System.out.println("Throttling Enabled for " + mesg.getConnectionId() + " " + mesg.getFeedRuntimeId());
- FeedConnectionId connectionId = mesg.getConnectionId();
- List<String> destinationLocations = new ArrayList<String>();
- List<String> storageLocations = FeedLifecycleListener.INSTANCE.getStoreLocations(connectionId);
- List<String> collectLocations = FeedLifecycleListener.INSTANCE.getCollectLocations(connectionId);
-
- destinationLocations.addAll(storageLocations);
- destinationLocations.addAll(collectLocations);
- JobSpecification messageJobSpec = FeedOperations.buildNotifyThrottlingEnabledMessageJob(mesg,
- destinationLocations);
- runJob(messageJobSpec, true);
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.warning("Acking disabled for " + mesg.getConnectionId() + " in view of activated throttling");
- }
- IFeedTrackingManager trackingManager = CentralFeedManager.getInstance().getFeedTrackingManager();
- trackingManager.disableAcking(connectionId);
- }
-
- @Override
- public void reportFeedActivity(FeedConnectionId connectionId, FeedActivity activity) {
- feedActivities.put(connectionId, activity);
- }
-
- @Override
- public FeedActivity getFeedActivity(FeedConnectionId connectionId) {
- return feedActivities.get(connectionId);
- }
-
- @Override
- public Collection<FeedActivity> getFeedActivities() {
- return feedActivities.values();
- }
-
- @Override
- public void removeFeedActivity(FeedConnectionId connectionId) {
- feedActivities.remove(connectionId);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ac683db0/asterix-app/src/main/java/org/apache/asterix/feed/FeedMessageReceiver.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/feed/FeedMessageReceiver.java b/asterix-app/src/main/java/org/apache/asterix/feed/FeedMessageReceiver.java
deleted file mode 100644
index 4ae2e59..0000000
--- a/asterix-app/src/main/java/org/apache/asterix/feed/FeedMessageReceiver.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.feed;
-
-import java.util.logging.Level;
-
-import org.apache.asterix.external.feed.api.IFeedLoadManager;
-import org.apache.asterix.external.feed.api.IFeedTrackingManager;
-import org.apache.asterix.external.feed.api.IFeedMessage.MessageType;
-import org.apache.asterix.external.feed.message.FeedCongestionMessage;
-import org.apache.asterix.external.feed.message.FeedReportMessage;
-import org.apache.asterix.external.feed.message.FeedTupleCommitAckMessage;
-import org.apache.asterix.external.feed.message.MessageReceiver;
-import org.apache.asterix.external.feed.message.ScaleInReportMessage;
-import org.apache.asterix.external.feed.message.StorageReportFeedMessage;
-import org.apache.asterix.external.feed.message.ThrottlingEnabledFeedMessage;
-import org.apache.asterix.external.feed.watch.NodeLoadReport;
-import org.apache.asterix.external.util.FeedConstants;
-import org.apache.asterix.feed.CentralFeedManager.AQLExecutor;
-import org.apache.asterix.hyracks.bootstrap.FeedBootstrap;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.json.JSONObject;
-
-public class FeedMessageReceiver extends MessageReceiver<String> {
-
- private static boolean initialized;
-
- private final IFeedLoadManager feedLoadManager;
- private final IFeedTrackingManager feedTrackingManager;
-
- public FeedMessageReceiver(CentralFeedManager centralFeedManager) {
- this.feedLoadManager = centralFeedManager.getFeedLoadManager();
- this.feedTrackingManager = centralFeedManager.getFeedTrackingManager();
- }
-
- @Override
- public void processMessage(String message) throws Exception {
- JSONObject obj = new JSONObject(message);
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Received message " + obj);
- }
- MessageType messageType = MessageType.valueOf(obj.getString(FeedConstants.MessageConstants.MESSAGE_TYPE));
- switch (messageType) {
- case XAQL:
- if (!initialized) {
- FeedBootstrap.setUpInitialArtifacts();
- initialized = true;
- }
- AQLExecutor.executeAQL(obj.getString(FeedConstants.MessageConstants.AQL));
- break;
- case CONGESTION:
- feedLoadManager.reportCongestion(FeedCongestionMessage.read(obj));
- break;
- case FEED_REPORT:
- feedLoadManager.submitFeedRuntimeReport(FeedReportMessage.read(obj));
- break;
- case NODE_REPORT:
- feedLoadManager.submitNodeLoadReport(NodeLoadReport.read(obj));
- break;
- case SCALE_IN_REQUEST:
- feedLoadManager.submitScaleInPossibleReport(ScaleInReportMessage.read(obj));
- break;
- case STORAGE_REPORT:
- FeedLifecycleListener.INSTANCE.updateTrackingInformation(StorageReportFeedMessage.read(obj));
- break;
- case COMMIT_ACK:
- feedTrackingManager.submitAckReport(FeedTupleCommitAckMessage.read(obj));
- break;
- case THROTTLING_ENABLED:
- feedLoadManager.reportThrottlingEnabled(ThrottlingEnabledFeedMessage.read(obj));
- default:
- break;
- }
-
- }
-
- @Override
- public void emptyInbox() throws HyracksDataException {
- }
-}