You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by am...@apache.org on 2016/01/14 21:31:57 UTC
[03/26] incubator-asterixdb git commit: Feed Fixes and Cleanup
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedIntakeOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedIntakeOperatorNodePushable.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedIntakeOperatorNodePushable.java
deleted file mode 100644
index 5085087..0000000
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedIntakeOperatorNodePushable.java
+++ /dev/null
@@ -1,215 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.metadata.feeds;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
-import org.apache.asterix.common.feeds.CollectionRuntime;
-import org.apache.asterix.common.feeds.DistributeFeedFrameWriter;
-import org.apache.asterix.common.feeds.FeedId;
-import org.apache.asterix.common.feeds.FeedPolicyAccessor;
-import org.apache.asterix.common.feeds.IngestionRuntime;
-import org.apache.asterix.common.feeds.SubscribableFeedRuntimeId;
-import org.apache.asterix.common.feeds.api.IAdapterRuntimeManager;
-import org.apache.asterix.common.feeds.api.IAdapterRuntimeManager.State;
-import org.apache.asterix.common.feeds.api.IDataSourceAdapter;
-import org.apache.asterix.common.feeds.api.IFeedManager;
-import org.apache.asterix.common.feeds.api.IFeedRuntime.FeedRuntimeType;
-import org.apache.asterix.common.feeds.api.IFeedSubscriptionManager;
-import org.apache.asterix.common.feeds.api.IIntakeProgressTracker;
-import org.apache.asterix.common.feeds.api.ISubscriberRuntime;
-import org.apache.asterix.external.api.IAdapterFactory;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
-
-/**
- * The runtime for @see{FeedIntakeOperationDescriptor}.
- * Provides the core functionality to set up the artifacts for ingestion of a feed.
- * The artifacts are lazily activated when a feed receives a subscription request.
- */
-public class FeedIntakeOperatorNodePushable extends AbstractUnaryOutputSourceOperatorNodePushable {
-
- private static Logger LOGGER = Logger.getLogger(FeedIntakeOperatorNodePushable.class.getName());
-
- private final FeedId feedId;
- private final int partition;
- private final IFeedSubscriptionManager feedSubscriptionManager;
- private final IFeedManager feedManager;
- private final IHyracksTaskContext ctx;
- private final IAdapterFactory adapterFactory;
-
- private IngestionRuntime ingestionRuntime;
- private IDataSourceAdapter adapter;
- private IIntakeProgressTracker tracker;
- private DistributeFeedFrameWriter feedFrameWriter;
-
- public FeedIntakeOperatorNodePushable(IHyracksTaskContext ctx, FeedId feedId, IAdapterFactory adapterFactory,
- int partition, IngestionRuntime ingestionRuntime, FeedPolicyAccessor policyAccessor) {
- this.ctx = ctx;
- this.feedId = feedId;
- this.partition = partition;
- this.ingestionRuntime = ingestionRuntime;
- this.adapterFactory = adapterFactory;
- IAsterixAppRuntimeContext runtimeCtx = (IAsterixAppRuntimeContext) ctx.getJobletContext()
- .getApplicationContext().getApplicationObject();
- this.feedSubscriptionManager = runtimeCtx.getFeedManager().getFeedSubscriptionManager();
- this.feedManager = runtimeCtx.getFeedManager();
- }
-
- @Override
- public void initialize() throws HyracksDataException {
- IAdapterRuntimeManager adapterRuntimeManager = null;
- try {
- if (ingestionRuntime == null) {
- try {
- adapter = adapterFactory.createAdapter(ctx, partition);
- //TODO: Fix record tracking
- // if (adapterFactory.isRecordTrackingEnabled()) {
- // tracker = adapterFactory.createIntakeProgressTracker();
- // }
- } catch (Exception e) {
- LOGGER.severe("Unable to create adapter : " + adapterFactory.getAlias() + "[" + partition + "]"
- + " Exception " + e);
- throw new HyracksDataException(e);
- }
- FrameTupleAccessor fta = new FrameTupleAccessor(recordDesc);
- feedFrameWriter = new DistributeFeedFrameWriter(ctx, feedId, writer, FeedRuntimeType.INTAKE, partition,
- fta, feedManager);
- adapterRuntimeManager = new AdapterRuntimeManager(feedId, adapter, tracker, feedFrameWriter, partition);
- SubscribableFeedRuntimeId runtimeId = new SubscribableFeedRuntimeId(feedId, FeedRuntimeType.INTAKE,
- partition);
- ingestionRuntime = new IngestionRuntime(feedId, runtimeId, feedFrameWriter, recordDesc,
- adapterRuntimeManager);
- feedSubscriptionManager.registerFeedSubscribableRuntime(ingestionRuntime);
- feedFrameWriter.open();
- } else {
- if (ingestionRuntime.getAdapterRuntimeManager().getState().equals(State.INACTIVE_INGESTION)) {
- ingestionRuntime.getAdapterRuntimeManager().setState(State.ACTIVE_INGESTION);
- adapter = ingestionRuntime.getAdapterRuntimeManager().getFeedAdapter();
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info(" Switching to " + State.ACTIVE_INGESTION + " for ingestion runtime "
- + ingestionRuntime);
- LOGGER.info(" Adaptor " + adapter.getClass().getName() + "[" + partition + "]"
- + " connected to backend for feed " + feedId);
- }
- feedFrameWriter = ingestionRuntime.getFeedFrameWriter();
- } else {
- String message = "Feed Ingestion Runtime for feed " + feedId
- + " is already registered and is active!.";
- LOGGER.severe(message);
- throw new IllegalStateException(message);
- }
- }
-
- waitTillIngestionIsOver(adapterRuntimeManager);
- feedSubscriptionManager
- .deregisterFeedSubscribableRuntime((SubscribableFeedRuntimeId) ingestionRuntime.getRuntimeId());
- if (adapterRuntimeManager.getState().equals(IAdapterRuntimeManager.State.FAILED_INGESTION)) {
- throw new HyracksDataException("Unable to ingest data");
- }
-
- } catch (InterruptedException ie) {
- /*
- * An Interrupted Exception is thrown if the Intake job cannot progress further due to failure of another node involved in the Hyracks job.
- * As the Intake job involves only the intake operator, the exception is indicative of a failure at the sibling intake operator location.
- * The surviving intake partitions must continue to live and receive data from the external source.
- */
- List<ISubscriberRuntime> subscribers = ingestionRuntime.getSubscribers();
- FeedPolicyAccessor policyAccessor = new FeedPolicyAccessor(new HashMap<String, String>());
- boolean needToHandleFailure = false;
- List<ISubscriberRuntime> failingSubscribers = new ArrayList<ISubscriberRuntime>();
- for (ISubscriberRuntime subscriber : subscribers) {
- policyAccessor.reset(subscriber.getFeedPolicy());
- if (!policyAccessor.continueOnHardwareFailure()) {
- failingSubscribers.add(subscriber);
- } else {
- needToHandleFailure = true;
- }
- }
-
- for (ISubscriberRuntime failingSubscriber : failingSubscribers) {
- try {
- ingestionRuntime.unsubscribeFeed((CollectionRuntime) failingSubscriber);
- } catch (Exception e) {
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning(
- "Excpetion in unsubscribing " + failingSubscriber + " message " + e.getMessage());
- }
- }
- }
-
- if (needToHandleFailure) {
- ingestionRuntime.getAdapterRuntimeManager().setState(State.INACTIVE_INGESTION);
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Switching to " + State.INACTIVE_INGESTION + " on occurrence of failure.");
- }
- } else {
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info(
- "Interrupted Exception. None of the subscribers need to handle failures. Shutting down feed ingestion");
- }
- feedSubscriptionManager
- .deregisterFeedSubscribableRuntime((SubscribableFeedRuntimeId) ingestionRuntime.getRuntimeId());
- throw new HyracksDataException(ie);
- }
- } catch (Exception e) {
- e.printStackTrace();
- throw new HyracksDataException(e);
- } finally {
- if (ingestionRuntime != null
- && !ingestionRuntime.getAdapterRuntimeManager().getState().equals(State.INACTIVE_INGESTION)) {
- feedFrameWriter.close();
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Closed Frame Writer " + feedFrameWriter + " adapter state "
- + ingestionRuntime.getAdapterRuntimeManager().getState());
- }
- } else {
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Ending intake operator node pushable in state " + State.INACTIVE_INGESTION
- + " Will resume after correcting failure");
- }
- }
-
- }
- }
-
- private void waitTillIngestionIsOver(IAdapterRuntimeManager adapterRuntimeManager) throws InterruptedException {
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Waiting for adaptor [" + partition + "]" + "to be done with ingestion of feed " + feedId);
- }
- synchronized (adapterRuntimeManager) {
- while (!(adapterRuntimeManager.getState().equals(IAdapterRuntimeManager.State.FINISHED_INGESTION)
- || (adapterRuntimeManager.getState().equals(IAdapterRuntimeManager.State.FAILED_INGESTION)))) {
- adapterRuntimeManager.wait();
- }
- }
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info(" Adaptor " + adapter.getClass().getName() + "[" + partition + "]"
- + " done with ingestion of feed " + feedId);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedLifecycleEventSubscriber.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedLifecycleEventSubscriber.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedLifecycleEventSubscriber.java
deleted file mode 100644
index 2add90d..0000000
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedLifecycleEventSubscriber.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.metadata.feeds;
-
-import java.util.Iterator;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.common.feeds.api.IFeedLifecycleEventSubscriber;
-
-public class FeedLifecycleEventSubscriber implements IFeedLifecycleEventSubscriber {
-
- private LinkedBlockingQueue<FeedLifecycleEvent> inbox;
-
- public FeedLifecycleEventSubscriber() {
- this.inbox = new LinkedBlockingQueue<FeedLifecycleEvent>();
- }
-
- @Override
- public void handleFeedEvent(FeedLifecycleEvent event) {
- inbox.add(event);
- }
-
- @Override
- public void assertEvent(FeedLifecycleEvent event) throws AsterixException, InterruptedException {
- boolean eventOccurred = false;
- FeedLifecycleEvent e = null;
- Iterator<FeedLifecycleEvent> eventsSoFar = inbox.iterator();
- while (eventsSoFar.hasNext()) {
- e = eventsSoFar.next();
- assertNoFailure(e);
- eventOccurred = e.equals(event);
- }
-
- while (!eventOccurred) {
- e = inbox.take();
- eventOccurred = e.equals(event);
- if (!eventOccurred) {
- assertNoFailure(e);
- }
- }
- }
-
- private void assertNoFailure(FeedLifecycleEvent e) throws AsterixException {
- if (e.equals(FeedLifecycleEvent.FEED_INTAKE_FAILURE) || e.equals(FeedLifecycleEvent.FEED_COLLECT_FAILURE)) {
- throw new AsterixException("Failure in feed");
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMessageOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMessageOperatorDescriptor.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMessageOperatorDescriptor.java
deleted file mode 100644
index a1e9917..0000000
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMessageOperatorDescriptor.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.metadata.feeds;
-
-import org.apache.asterix.common.feeds.FeedConnectionId;
-import org.apache.asterix.common.feeds.api.IFeedMessage;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
-import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.job.JobSpecification;
-import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
-
-/**
- * Sends a control message to the registered message queue for feed specified by its feedId.
- */
-public class FeedMessageOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
-
- private static final long serialVersionUID = 1L;
-
- private final FeedConnectionId connectionId;
- private final IFeedMessage feedMessage;
-
- public FeedMessageOperatorDescriptor(JobSpecification spec, FeedConnectionId connectionId,
- IFeedMessage feedMessage) {
- super(spec, 0, 1);
- this.connectionId = connectionId;
- this.feedMessage = feedMessage;
- }
-
- @Override
- public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
- IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
- return new FeedMessageOperatorNodePushable(ctx, connectionId, feedMessage, partition, nPartitions);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMessageOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMessageOperatorNodePushable.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMessageOperatorNodePushable.java
deleted file mode 100644
index 313fa1a..0000000
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMessageOperatorNodePushable.java
+++ /dev/null
@@ -1,302 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.metadata.feeds;
-
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
-import org.apache.asterix.common.feeds.CollectionRuntime;
-import org.apache.asterix.common.feeds.DistributeFeedFrameWriter;
-import org.apache.asterix.common.feeds.FeedCollectRuntimeInputHandler;
-import org.apache.asterix.common.feeds.FeedConnectionId;
-import org.apache.asterix.common.feeds.FeedFrameCollector;
-import org.apache.asterix.common.feeds.FeedFrameCollector.State;
-import org.apache.asterix.common.feeds.FeedId;
-import org.apache.asterix.common.feeds.FeedRuntime;
-import org.apache.asterix.common.feeds.FeedRuntimeId;
-import org.apache.asterix.common.feeds.FeedRuntimeInputHandler;
-import org.apache.asterix.common.feeds.FeedRuntimeManager;
-import org.apache.asterix.common.feeds.FeedTupleCommitResponseMessage;
-import org.apache.asterix.common.feeds.IngestionRuntime;
-import org.apache.asterix.common.feeds.IntakePartitionStatistics;
-import org.apache.asterix.common.feeds.MonitoredBufferTimerTasks.MonitoredBufferStorageTimerTask;
-import org.apache.asterix.common.feeds.StorageSideMonitoredBuffer;
-import org.apache.asterix.common.feeds.SubscribableFeedRuntimeId;
-import org.apache.asterix.common.feeds.api.IAdapterRuntimeManager;
-import org.apache.asterix.common.feeds.api.IFeedManager;
-import org.apache.asterix.common.feeds.api.IFeedMessage;
-import org.apache.asterix.common.feeds.api.IFeedRuntime.FeedRuntimeType;
-import org.apache.asterix.common.feeds.api.IFeedRuntime.Mode;
-import org.apache.asterix.common.feeds.api.IIntakeProgressTracker;
-import org.apache.asterix.common.feeds.api.ISubscribableRuntime;
-import org.apache.asterix.common.feeds.message.EndFeedMessage;
-import org.apache.asterix.common.feeds.message.ThrottlingEnabledFeedMessage;
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
-
-/**
- * Runtime for the FeedMessageOpertorDescriptor. This operator is responsible for communicating
- * a feed message to the local feed manager on the host node controller.
- *
- * @see FeedMessageOperatorDescriptor
- * IFeedMessage
- * IFeedManager
- */
-public class FeedMessageOperatorNodePushable extends AbstractUnaryOutputSourceOperatorNodePushable {
-
- private static final Logger LOGGER = Logger.getLogger(FeedMessageOperatorNodePushable.class.getName());
-
- private final FeedConnectionId connectionId;
- private final IFeedMessage message;
- private final IFeedManager feedManager;
- private final int partition;
-
- public FeedMessageOperatorNodePushable(IHyracksTaskContext ctx, FeedConnectionId connectionId,
- IFeedMessage feedMessage, int partition, int nPartitions) {
- this.connectionId = connectionId;
- this.message = feedMessage;
- this.partition = partition;
- IAsterixAppRuntimeContext runtimeCtx = (IAsterixAppRuntimeContext) ctx.getJobletContext()
- .getApplicationContext().getApplicationObject();
- this.feedManager = runtimeCtx.getFeedManager();
- }
-
- @Override
- public void initialize() throws HyracksDataException {
- try {
- writer.open();
- switch (message.getMessageType()) {
- case END:
- EndFeedMessage endFeedMessage = (EndFeedMessage) message;
- switch (endFeedMessage.getEndMessageType()) {
- case DISCONNECT_FEED:
- hanldeDisconnectFeedTypeMessage(endFeedMessage);
- break;
- case DISCONTINUE_SOURCE:
- handleDiscontinueFeedTypeMessage(endFeedMessage);
- break;
- }
- break;
- case PREPARE_STALL: {
- handlePrepareStallMessage((PrepareStallMessage) message);
- break;
- }
- case TERMINATE_FLOW: {
- FeedConnectionId connectionId = ((TerminateDataFlowMessage) message).getConnectionId();
- handleTerminateFlowMessage(connectionId);
- break;
- }
- case COMMIT_ACK_RESPONSE: {
- handleFeedTupleCommitResponseMessage((FeedTupleCommitResponseMessage) message);
- break;
- }
- case THROTTLING_ENABLED: {
- handleThrottlingEnabledMessage((ThrottlingEnabledFeedMessage) message);
- break;
- }
- default:
- break;
-
- }
-
- } catch (Exception e) {
- throw new HyracksDataException(e);
- } finally {
- writer.close();
- }
- }
-
- private void handleThrottlingEnabledMessage(ThrottlingEnabledFeedMessage throttlingMessage) {
- FeedConnectionId connectionId = throttlingMessage.getConnectionId();
- FeedRuntimeManager runtimeManager = feedManager.getFeedConnectionManager().getFeedRuntimeManager(connectionId);
- Set<FeedRuntimeId> runtimes = runtimeManager.getFeedRuntimes();
- for (FeedRuntimeId runtimeId : runtimes) {
- if (runtimeId.getFeedRuntimeType().equals(FeedRuntimeType.STORE)) {
- FeedRuntime storeRuntime = runtimeManager.getFeedRuntime(runtimeId);
- ((StorageSideMonitoredBuffer) (storeRuntime.getInputHandler().getmBuffer())).setAcking(false);
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Acking Disabled in view of throttling that has been activted upfron in the pipeline "
- + connectionId);
- }
- }
- }
- }
-
- private void handleFeedTupleCommitResponseMessage(FeedTupleCommitResponseMessage commitResponseMessage) {
- FeedConnectionId connectionId = commitResponseMessage.getConnectionId();
- FeedRuntimeManager runtimeManager = feedManager.getFeedConnectionManager().getFeedRuntimeManager(connectionId);
- Set<FeedRuntimeId> runtimes = runtimeManager.getFeedRuntimes();
- for (FeedRuntimeId runtimeId : runtimes) {
- FeedRuntime runtime = runtimeManager.getFeedRuntime(runtimeId);
- switch (runtimeId.getFeedRuntimeType()) {
- case COLLECT:
- FeedCollectRuntimeInputHandler inputHandler = (FeedCollectRuntimeInputHandler) runtime
- .getInputHandler();
- int maxBasePersisted = commitResponseMessage.getMaxWindowAcked();
- inputHandler.dropTill(IntakePartitionStatistics.ACK_WINDOW_SIZE * (maxBasePersisted + 1));
- break;
- case STORE:
- MonitoredBufferStorageTimerTask sTask = runtime.getInputHandler().getmBuffer()
- .getStorageTimeTrackingRateTask();
- sTask.receiveCommitAckResponse(commitResponseMessage);
- break;
- default:
- break;
- }
- }
-
- commitResponseMessage.getIntakePartition();
- SubscribableFeedRuntimeId sid = new SubscribableFeedRuntimeId(connectionId.getFeedId(), FeedRuntimeType.INTAKE,
- partition);
- IngestionRuntime ingestionRuntime = (IngestionRuntime) feedManager.getFeedSubscriptionManager()
- .getSubscribableRuntime(sid);
- if (ingestionRuntime != null) {
- IIntakeProgressTracker tracker = ingestionRuntime.getAdapterRuntimeManager().getProgressTracker();
- if (tracker != null) {
- tracker.notifyIngestedTupleTimestamp(System.currentTimeMillis());
- }
- }
- }
-
- private void handleTerminateFlowMessage(FeedConnectionId connectionId) throws HyracksDataException {
- FeedRuntimeManager runtimeManager = feedManager.getFeedConnectionManager().getFeedRuntimeManager(connectionId);
- Set<FeedRuntimeId> feedRuntimes = runtimeManager.getFeedRuntimes();
-
- boolean found = false;
- for (FeedRuntimeId runtimeId : feedRuntimes) {
- FeedRuntime runtime = runtimeManager.getFeedRuntime(runtimeId);
- if (runtime.getRuntimeId().getRuntimeType().equals(FeedRuntimeType.COLLECT)) {
- ((CollectionRuntime) runtime).getFrameCollector().setState(State.HANDOVER);
- found = true;
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Switched " + runtime + " to Hand Over stage");
- }
- }
- }
- if (!found) {
- throw new HyracksDataException("COLLECT Runtime not found!");
- }
- }
-
- private void handlePrepareStallMessage(PrepareStallMessage prepareStallMessage) throws HyracksDataException {
- FeedConnectionId connectionId = prepareStallMessage.getConnectionId();
- int computePartitionsRetainLimit = prepareStallMessage.getComputePartitionsRetainLimit();
- FeedRuntimeManager runtimeManager = feedManager.getFeedConnectionManager().getFeedRuntimeManager(connectionId);
- Set<FeedRuntimeId> feedRuntimes = runtimeManager.getFeedRuntimes();
- for (FeedRuntimeId runtimeId : feedRuntimes) {
- FeedRuntime runtime = runtimeManager.getFeedRuntime(runtimeId);
- switch (runtimeId.getFeedRuntimeType()) {
- case COMPUTE:
- Mode requiredMode = runtimeId.getPartition() <= computePartitionsRetainLimit ? Mode.STALL
- : Mode.END;
- runtime.setMode(requiredMode);
- break;
- default:
- runtime.setMode(Mode.STALL);
- break;
- }
- }
- }
-
- private void handleDiscontinueFeedTypeMessage(EndFeedMessage endFeedMessage) throws Exception {
- FeedId sourceFeedId = endFeedMessage.getSourceFeedId();
- SubscribableFeedRuntimeId subscribableRuntimeId = new SubscribableFeedRuntimeId(sourceFeedId,
- FeedRuntimeType.INTAKE, partition);
- ISubscribableRuntime feedRuntime = feedManager.getFeedSubscriptionManager()
- .getSubscribableRuntime(subscribableRuntimeId);
- IAdapterRuntimeManager adapterRuntimeManager = ((IngestionRuntime) feedRuntime).getAdapterRuntimeManager();
- adapterRuntimeManager.stop();
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Stopped Adapter " + adapterRuntimeManager);
- }
- }
-
- private void hanldeDisconnectFeedTypeMessage(EndFeedMessage endFeedMessage) throws Exception {
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Ending feed:" + endFeedMessage.getFeedConnectionId());
- }
- FeedRuntimeId runtimeId = null;
- FeedRuntimeType subscribableRuntimeType = ((EndFeedMessage) message).getSourceRuntimeType();
- if (endFeedMessage.isCompleteDisconnection()) {
- // subscribableRuntimeType represents the location at which the feed connection receives data
- FeedRuntimeType runtimeType = null;
- switch (subscribableRuntimeType) {
- case INTAKE:
- runtimeType = FeedRuntimeType.COLLECT;
- break;
- case COMPUTE:
- runtimeType = FeedRuntimeType.COMPUTE_COLLECT;
- break;
- default:
- throw new IllegalStateException("Invalid subscribable runtime type " + subscribableRuntimeType);
- }
-
- runtimeId = new FeedRuntimeId(runtimeType, partition, FeedRuntimeId.DEFAULT_OPERAND_ID);
- CollectionRuntime feedRuntime = (CollectionRuntime) feedManager.getFeedConnectionManager()
- .getFeedRuntime(connectionId, runtimeId);
- feedRuntime.getSourceRuntime().unsubscribeFeed(feedRuntime);
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Complete Unsubscription of " + endFeedMessage.getFeedConnectionId());
- }
- } else {
- // subscribaleRuntimeType represents the location for data hand-off in presence of subscribers
- switch (subscribableRuntimeType) {
- case INTAKE:
- // illegal state as data hand-off from one feed to another does not happen at intake
- throw new IllegalStateException("Illegal State, invalid runtime type " + subscribableRuntimeType);
- case COMPUTE:
- // feed could be primary or secondary, doesn't matter
- SubscribableFeedRuntimeId feedSubscribableRuntimeId = new SubscribableFeedRuntimeId(
- connectionId.getFeedId(), FeedRuntimeType.COMPUTE, partition);
- ISubscribableRuntime feedRuntime = feedManager.getFeedSubscriptionManager()
- .getSubscribableRuntime(feedSubscribableRuntimeId);
- DistributeFeedFrameWriter dWriter = feedRuntime.getFeedFrameWriter();
- Map<IFrameWriter, FeedFrameCollector> registeredCollectors = dWriter.getRegisteredReaders();
-
- IFrameWriter unsubscribingWriter = null;
- for (Entry<IFrameWriter, FeedFrameCollector> entry : registeredCollectors.entrySet()) {
- IFrameWriter frameWriter = entry.getKey();
- FeedRuntimeInputHandler feedFrameWriter = (FeedRuntimeInputHandler) frameWriter;
- if (feedFrameWriter.getConnectionId().equals(endFeedMessage.getFeedConnectionId())) {
- unsubscribingWriter = feedFrameWriter;
- dWriter.unsubscribeFeed(unsubscribingWriter);
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Partial Unsubscription of " + unsubscribingWriter);
- }
- break;
- }
- }
- break;
- default:
- break;
- }
-
- }
-
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Unsubscribed from feed :" + connectionId);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetaComputeNodePushable.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetaComputeNodePushable.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetaComputeNodePushable.java
deleted file mode 100644
index f833019..0000000
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetaComputeNodePushable.java
+++ /dev/null
@@ -1,227 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.metadata.feeds;
-
-import java.nio.ByteBuffer;
-import java.util.Map;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
-import org.apache.asterix.common.feeds.DistributeFeedFrameWriter;
-import org.apache.asterix.common.feeds.FeedConnectionId;
-import org.apache.asterix.common.feeds.FeedRuntime;
-import org.apache.asterix.common.feeds.FeedRuntimeId;
-import org.apache.asterix.common.feeds.FeedRuntimeInputHandler;
-import org.apache.asterix.common.feeds.SubscribableFeedRuntimeId;
-import org.apache.asterix.common.feeds.SubscribableRuntime;
-import org.apache.asterix.common.feeds.api.IFeedManager;
-import org.apache.asterix.common.feeds.api.IFeedRuntime.FeedRuntimeType;
-import org.apache.asterix.common.feeds.api.IFeedRuntime.Mode;
-import org.apache.asterix.external.feeds.FeedPolicyEnforcer;
-import org.apache.asterix.common.feeds.api.ISubscribableRuntime;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.IActivity;
-import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
-import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
-
-/*
- * This IFrameWriter doesn't follow the contract
- */
-public class FeedMetaComputeNodePushable extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
-
- private static final Logger LOGGER = Logger.getLogger(FeedMetaComputeNodePushable.class.getName());
-
- /** Runtime node pushable corresponding to the core feed operator **/
- private AbstractUnaryInputUnaryOutputOperatorNodePushable coreOperator;
-
- /**
- * A policy enforcer that ensures dynamic decisions for a feed are taken
- * in accordance with the associated ingestion policy
- **/
- private FeedPolicyEnforcer policyEnforcer;
-
- /**
- * The Feed Runtime instance associated with the operator. Feed Runtime
- * captures the state of the operator while the feed is active.
- */
- private FeedRuntime feedRuntime;
-
- /**
- * A unique identifier for the feed instance. A feed instance represents
- * the flow of data from a feed to a dataset.
- **/
- private FeedConnectionId connectionId;
-
- /**
- * Denotes the i'th operator instance in a setting where K operator
- * instances are scheduled to run in parallel
- **/
- private int partition;
-
- private int nPartitions;
-
- /** The (singleton) instance of IFeedManager **/
- private IFeedManager feedManager;
-
- private FrameTupleAccessor fta;
-
- private final IHyracksTaskContext ctx;
-
- private final FeedRuntimeType runtimeType = FeedRuntimeType.COMPUTE;
-
- private FeedRuntimeInputHandler inputSideHandler;
-
- public FeedMetaComputeNodePushable(IHyracksTaskContext ctx, IRecordDescriptorProvider recordDescProvider,
- int partition, int nPartitions, IOperatorDescriptor coreOperator, FeedConnectionId feedConnectionId,
- Map<String, String> feedPolicyProperties, String operationId) throws HyracksDataException {
- this.ctx = ctx;
- this.coreOperator = (AbstractUnaryInputUnaryOutputOperatorNodePushable) ((IActivity) coreOperator)
- .createPushRuntime(ctx, recordDescProvider, partition, nPartitions);
- this.policyEnforcer = new FeedPolicyEnforcer(feedConnectionId, feedPolicyProperties);
- this.partition = partition;
- this.nPartitions = nPartitions;
- this.connectionId = feedConnectionId;
- this.feedManager = ((IAsterixAppRuntimeContext) ctx.getJobletContext().getApplicationContext()
- .getApplicationObject()).getFeedManager();
- IAsterixAppRuntimeContext runtimeCtx = (IAsterixAppRuntimeContext) ctx.getJobletContext()
- .getApplicationContext().getApplicationObject();
- this.feedManager = runtimeCtx.getFeedManager();
- }
-
- @Override
- public void open() throws HyracksDataException {
- FeedRuntimeId runtimeId = new SubscribableFeedRuntimeId(connectionId.getFeedId(), runtimeType, partition);
- try {
- feedRuntime = feedManager.getFeedConnectionManager().getFeedRuntime(connectionId, runtimeId);
- if (feedRuntime == null) {
- initializeNewFeedRuntime(runtimeId);
- } else {
- reviveOldFeedRuntime(runtimeId);
- }
- writer.open();
- coreOperator.open();
- } catch (Exception e) {
- e.printStackTrace();
- throw new HyracksDataException(e);
- }
- }
-
- private void initializeNewFeedRuntime(FeedRuntimeId runtimeId) throws Exception {
- this.fta = new FrameTupleAccessor(recordDesc);
- this.inputSideHandler = new FeedRuntimeInputHandler(ctx, connectionId, runtimeId, coreOperator,
- policyEnforcer.getFeedPolicyAccessor(), true, fta, recordDesc, feedManager, nPartitions);
-
- DistributeFeedFrameWriter distributeWriter = new DistributeFeedFrameWriter(ctx, connectionId.getFeedId(),
- writer, runtimeType, partition, new FrameTupleAccessor(recordDesc), feedManager);
- coreOperator.setOutputFrameWriter(0, distributeWriter, recordDesc);
-
- feedRuntime = new SubscribableRuntime(connectionId.getFeedId(), runtimeId, inputSideHandler, distributeWriter,
- recordDesc);
- feedManager.getFeedSubscriptionManager().registerFeedSubscribableRuntime((ISubscribableRuntime) feedRuntime);
- feedManager.getFeedConnectionManager().registerFeedRuntime(connectionId, feedRuntime);
-
- distributeWriter.subscribeFeed(policyEnforcer.getFeedPolicyAccessor(), writer, connectionId);
- }
-
- private void reviveOldFeedRuntime(FeedRuntimeId runtimeId) throws Exception {
- this.fta = new FrameTupleAccessor(recordDesc);
- this.inputSideHandler = feedRuntime.getInputHandler();
- this.inputSideHandler.setCoreOperator(coreOperator);
-
- DistributeFeedFrameWriter distributeWriter = new DistributeFeedFrameWriter(ctx, connectionId.getFeedId(),
- writer, runtimeType, partition, new FrameTupleAccessor(recordDesc), feedManager);
- coreOperator.setOutputFrameWriter(0, distributeWriter, recordDesc);
- distributeWriter.subscribeFeed(policyEnforcer.getFeedPolicyAccessor(), writer, connectionId);
-
- inputSideHandler.reset(nPartitions);
- feedRuntime.setMode(Mode.PROCESS);
- }
-
- @Override
- public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
- try {
- inputSideHandler.nextFrame(buffer);
- } catch (Exception e) {
- e.printStackTrace();
- throw new HyracksDataException(e);
- }
- }
-
- @Override
- public void fail() throws HyracksDataException {
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning("Core Op:" + coreOperator.getDisplayName() + " fail ");
- }
- feedRuntime.setMode(Mode.FAIL);
- coreOperator.fail();
- }
-
- @Override
- public void close() throws HyracksDataException {
- boolean stalled = inputSideHandler.getMode().equals(Mode.STALL);
- boolean end = inputSideHandler.getMode().equals(Mode.END);
- try {
- if (inputSideHandler != null) {
- if (!(stalled || end)) {
- inputSideHandler.nextFrame(null); // signal end of data
- while (!inputSideHandler.isFinished()) {
- synchronized (coreOperator) {
- coreOperator.wait();
- }
- }
- } else {
- inputSideHandler.setFinished(true);
- }
- }
- coreOperator.close();
- System.out.println("CLOSED " + coreOperator + " STALLED ?" + stalled + " ENDED " + end);
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- if (!stalled) {
- deregister();
- System.out.println("DEREGISTERING " + this.feedRuntime.getRuntimeId());
- } else {
- System.out.println("NOT DEREGISTERING " + this.feedRuntime.getRuntimeId());
- }
- if (inputSideHandler != null) {
- inputSideHandler.close();
- }
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Ending Operator " + this.feedRuntime.getRuntimeId());
- }
- }
- }
-
- private void deregister() {
- if (feedRuntime != null) {
- // deregister from subscription manager
- SubscribableFeedRuntimeId runtimeId = (SubscribableFeedRuntimeId) feedRuntime.getRuntimeId();
- feedManager.getFeedSubscriptionManager().deregisterFeedSubscribableRuntime(runtimeId);
-
- // deregister from connection manager
- feedManager.getFeedConnectionManager().deRegisterFeedRuntime(connectionId, feedRuntime.getRuntimeId());
- }
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetaNodePushable.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetaNodePushable.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetaNodePushable.java
deleted file mode 100644
index 86f8750..0000000
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetaNodePushable.java
+++ /dev/null
@@ -1,189 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.metadata.feeds;
-
-import java.nio.ByteBuffer;
-import java.util.Map;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
-import org.apache.asterix.common.feeds.FeedConnectionId;
-import org.apache.asterix.common.feeds.FeedRuntime;
-import org.apache.asterix.common.feeds.FeedRuntimeId;
-import org.apache.asterix.common.feeds.FeedRuntimeInputHandler;
-import org.apache.asterix.common.feeds.api.IFeedManager;
-import org.apache.asterix.common.feeds.api.IFeedRuntime.FeedRuntimeType;
-import org.apache.asterix.common.feeds.api.IFeedRuntime.Mode;
-import org.apache.asterix.external.feeds.FeedPolicyEnforcer;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.IActivity;
-import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
-import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
-
-public class FeedMetaNodePushable extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
-
- private static final Logger LOGGER = Logger.getLogger(FeedMetaNodePushable.class.getName());
-
- /** Runtime node pushable corresponding to the core feed operator **/
- private AbstractUnaryInputUnaryOutputOperatorNodePushable coreOperator;
-
- /**
- * A policy enforcer that ensures dyanmic decisions for a feed are taken
- * in accordance with the associated ingestion policy
- **/
- private FeedPolicyEnforcer policyEnforcer;
-
- /**
- * The Feed Runtime instance associated with the operator. Feed Runtime
- * captures the state of the operator while the feed is active.
- */
- private FeedRuntime feedRuntime;
-
- /**
- * A unique identifier for the feed instance. A feed instance represents
- * the flow of data from a feed to a dataset.
- **/
- private FeedConnectionId connectionId;
-
- /**
- * Denotes the i'th operator instance in a setting where K operator
- * instances are scheduled to run in parallel
- **/
- private int partition;
-
- /** Total number of partitions available **/
- private int nPartitions;
-
- /** Type associated with the core feed operator **/
- private final FeedRuntimeType runtimeType = FeedRuntimeType.OTHER;
-
- /** The (singleton) instance of IFeedManager **/
- private IFeedManager feedManager;
-
- private FrameTupleAccessor fta;
-
- private final IHyracksTaskContext ctx;
-
- private final String operandId;
-
- /** The pre-processor associated with this runtime **/
- private FeedRuntimeInputHandler inputSideHandler;
-
- public FeedMetaNodePushable(IHyracksTaskContext ctx, IRecordDescriptorProvider recordDescProvider, int partition,
- int nPartitions, IOperatorDescriptor coreOperator, FeedConnectionId feedConnectionId,
- Map<String, String> feedPolicyProperties, String operationId) throws HyracksDataException {
- this.ctx = ctx;
- this.coreOperator = (AbstractUnaryInputUnaryOutputOperatorNodePushable) ((IActivity) coreOperator)
- .createPushRuntime(ctx, recordDescProvider, partition, nPartitions);
- this.policyEnforcer = new FeedPolicyEnforcer(feedConnectionId, feedPolicyProperties);
- this.partition = partition;
- this.nPartitions = nPartitions;
- this.connectionId = feedConnectionId;
- this.feedManager = ((IAsterixAppRuntimeContext) (IAsterixAppRuntimeContext) ctx.getJobletContext()
- .getApplicationContext().getApplicationObject()).getFeedManager();
- IAsterixAppRuntimeContext runtimeCtx = (IAsterixAppRuntimeContext) ctx.getJobletContext()
- .getApplicationContext().getApplicationObject();
- this.feedManager = runtimeCtx.getFeedManager();
- this.operandId = operationId;
- }
-
- @Override
- public void open() throws HyracksDataException {
- FeedRuntimeId runtimeId = new FeedRuntimeId(runtimeType, partition, operandId);
- try {
- feedRuntime = feedManager.getFeedConnectionManager().getFeedRuntime(connectionId, runtimeId);
- if (feedRuntime == null) {
- initializeNewFeedRuntime(runtimeId);
- } else {
- reviveOldFeedRuntime(runtimeId);
- }
- coreOperator.open();
- } catch (Exception e) {
- e.printStackTrace();
- throw new HyracksDataException(e);
- }
- }
-
- private void initializeNewFeedRuntime(FeedRuntimeId runtimeId) throws Exception {
- this.fta = new FrameTupleAccessor(recordDesc);
- this.inputSideHandler = new FeedRuntimeInputHandler(ctx, connectionId, runtimeId,
- (AbstractUnaryInputUnaryOutputOperatorNodePushable) coreOperator,
- policyEnforcer.getFeedPolicyAccessor(), false, fta, recordDesc, feedManager,
- nPartitions);
-
- setupBasicRuntime(inputSideHandler);
- }
-
- private void reviveOldFeedRuntime(FeedRuntimeId runtimeId) throws Exception {
- this.inputSideHandler = feedRuntime.getInputHandler();
- this.fta = new FrameTupleAccessor(recordDesc);
- coreOperator.setOutputFrameWriter(0, writer, recordDesc);
- feedRuntime.setMode(Mode.PROCESS);
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Retreived state from the zombie instance " + runtimeType + " node.");
- }
- }
-
- private void setupBasicRuntime(FeedRuntimeInputHandler inputHandler) throws Exception {
- coreOperator.setOutputFrameWriter(0, writer, recordDesc);
- FeedRuntimeId runtimeId = new FeedRuntimeId(runtimeType, partition, operandId);
- feedRuntime = new FeedRuntime(runtimeId, inputHandler, writer);
- }
-
- @Override
- public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
- try {
- inputSideHandler.nextFrame(buffer);
- } catch (Exception e) {
- e.printStackTrace();
- throw new HyracksDataException(e);
- }
- }
-
- @Override
- public void fail() throws HyracksDataException {
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.info("Core Op:" + coreOperator.getDisplayName() + " fail ");
- }
- feedRuntime.setMode(Mode.FAIL);
- coreOperator.fail();
- }
-
- @Override
- public void close() throws HyracksDataException {
- try {
- coreOperator.close();
- } catch (Exception e) {
- e.printStackTrace();
- // ignore
- } finally {
- if (inputSideHandler != null) {
- inputSideHandler.close();
- }
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Ending Operator " + this.feedRuntime.getRuntimeId());
- }
- }
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetaOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetaOperatorDescriptor.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetaOperatorDescriptor.java
deleted file mode 100644
index 5d88a9e..0000000
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetaOperatorDescriptor.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.metadata.feeds;
-
-import java.util.Map;
-
-import org.apache.asterix.common.feeds.FeedConnectionId;
-import org.apache.asterix.common.feeds.api.IFeedRuntime.FeedRuntimeType;
-import org.apache.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
-import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
-import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.job.JobSpecification;
-import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
-
-/**
- * FeedMetaOperatorDescriptor is a wrapper operator that provides a sanboox like
- * environment for an hyracks operator that is part of a feed ingestion
- * pipeline. The MetaFeed operator provides an interface iden- tical to that
- * offered by the underlying wrapped operator, hereafter referred to as the core
- * operator. As seen by Hyracks, the altered pipeline is identical to the
- * earlier version formed from core operators. The MetaFeed operator enhances
- * each core operator by providing functionality for handling runtime
- * exceptions, saving any state for future retrieval, and measuring/reporting of
- * performance characteristics. We next describe how the added functionality
- * contributes to providing fault- tolerance.
- */
-
-public class FeedMetaOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
-
- private static final long serialVersionUID = 1L;
-
- /**
- * The actual (Hyracks) operator that is wrapped around by the MetaFeed
- * operator.
- **/
- private IOperatorDescriptor coreOperator;
-
- /**
- * A unique identifier for the feed instance. A feed instance represents the
- * flow of data from a feed to a dataset.
- **/
- private final FeedConnectionId feedConnectionId;
-
- /**
- * The policy associated with the feed instance.
- **/
- private final Map<String, String> feedPolicyProperties;
-
- /**
- * type for the feed runtime associated with the operator.
- * Possible values: COMPUTE, STORE, OTHER
- **/
- private final FeedRuntimeType runtimeType;
-
- private final String operandId;
-
- public FeedMetaOperatorDescriptor(JobSpecification spec, FeedConnectionId feedConnectionId,
- IOperatorDescriptor coreOperatorDescriptor, Map<String, String> feedPolicyProperties,
- FeedRuntimeType runtimeType, boolean enableSubscriptionMode, String operandId) {
- super(spec, coreOperatorDescriptor.getInputArity(), coreOperatorDescriptor.getOutputArity());
- this.feedConnectionId = feedConnectionId;
- this.feedPolicyProperties = feedPolicyProperties;
- if (coreOperatorDescriptor.getOutputRecordDescriptors().length == 1) {
- recordDescriptors[0] = coreOperatorDescriptor.getOutputRecordDescriptors()[0];
- }
- this.coreOperator = coreOperatorDescriptor;
- this.runtimeType = runtimeType;
- this.operandId = operandId;
- }
-
- @Override
- public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
- IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
- IOperatorNodePushable nodePushable = null;
- switch (runtimeType) {
- case COMPUTE:
- nodePushable = new FeedMetaComputeNodePushable(ctx, recordDescProvider, partition, nPartitions,
- coreOperator, feedConnectionId, feedPolicyProperties, operandId);
- break;
- case STORE:
- nodePushable = new FeedMetaStoreNodePushable(ctx, recordDescProvider, partition, nPartitions,
- coreOperator, feedConnectionId, feedPolicyProperties, operandId);
- break;
- case OTHER:
- nodePushable = new FeedMetaNodePushable(ctx, recordDescProvider, partition, nPartitions, coreOperator,
- feedConnectionId, feedPolicyProperties, operandId);
- break;
- case ETS:
- nodePushable = ((AlgebricksMetaOperatorDescriptor) coreOperator).createPushRuntime(ctx,
- recordDescProvider, partition, nPartitions);
- break;
- case JOIN:
- break;
- default:
- throw new HyracksDataException(new IllegalArgumentException("Invalid feed runtime: " + runtimeType));
- }
- return nodePushable;
- }
-
- @Override
- public String toString() {
- return "FeedMeta [" + coreOperator + " ]";
- }
-
- public IOperatorDescriptor getCoreOperator() {
- return coreOperator;
- }
-
- public FeedRuntimeType getRuntimeType() {
- return runtimeType;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetaStoreNodePushable.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetaStoreNodePushable.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetaStoreNodePushable.java
deleted file mode 100644
index b409745..0000000
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetaStoreNodePushable.java
+++ /dev/null
@@ -1,224 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.metadata.feeds;
-
-import java.nio.ByteBuffer;
-import java.util.Map;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
-import org.apache.asterix.common.dataflow.AsterixLSMInsertDeleteOperatorNodePushable;
-import org.apache.asterix.common.feeds.FeedConnectionId;
-import org.apache.asterix.common.feeds.FeedRuntime;
-import org.apache.asterix.common.feeds.FeedRuntimeId;
-import org.apache.asterix.common.feeds.FeedRuntimeInputHandler;
-import org.apache.asterix.common.feeds.api.IFeedManager;
-import org.apache.asterix.common.feeds.api.IFeedRuntime.FeedRuntimeType;
-import org.apache.asterix.common.feeds.api.IFeedRuntime.Mode;
-import org.apache.asterix.external.feeds.FeedPolicyEnforcer;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.IActivity;
-import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
-import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
-
-public class FeedMetaStoreNodePushable extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
-
- private static final Logger LOGGER = Logger.getLogger(FeedMetaStoreNodePushable.class.getName());
-
- /** Runtime node pushable corresponding to the core feed operator **/
- private AbstractUnaryInputUnaryOutputOperatorNodePushable coreOperator;
-
- /**
- * A policy enforcer that ensures dyanmic decisions for a feed are taken
- * in accordance with the associated ingestion policy
- **/
- private FeedPolicyEnforcer policyEnforcer;
-
- /**
- * The Feed Runtime instance associated with the operator. Feed Runtime
- * captures the state of the operator while the feed is active.
- */
- private FeedRuntime feedRuntime;
-
- /**
- * A unique identifier for the feed instance. A feed instance represents
- * the flow of data from a feed to a dataset.
- **/
- private FeedConnectionId connectionId;
-
- /**
- * Denotes the i'th operator instance in a setting where K operator
- * instances are scheduled to run in parallel
- **/
- private int partition;
-
- private int nPartitions;
-
- /** Type associated with the core feed operator **/
- private final FeedRuntimeType runtimeType = FeedRuntimeType.STORE;
-
- /** The (singleton) instance of IFeedManager **/
- private IFeedManager feedManager;
-
- private FrameTupleAccessor fta;
-
- private final IHyracksTaskContext ctx;
-
- private final String operandId;
-
- private FeedRuntimeInputHandler inputSideHandler;
-
- public FeedMetaStoreNodePushable(IHyracksTaskContext ctx, IRecordDescriptorProvider recordDescProvider,
- int partition, int nPartitions, IOperatorDescriptor coreOperator, FeedConnectionId feedConnectionId,
- Map<String, String> feedPolicyProperties, String operationId) throws HyracksDataException {
- this.ctx = ctx;
- this.coreOperator = (AbstractUnaryInputUnaryOutputOperatorNodePushable) ((IActivity) coreOperator)
- .createPushRuntime(ctx, recordDescProvider, partition, nPartitions);
- this.policyEnforcer = new FeedPolicyEnforcer(feedConnectionId, feedPolicyProperties);
- this.partition = partition;
- this.nPartitions = nPartitions;
- this.connectionId = feedConnectionId;
- this.feedManager = ((IAsterixAppRuntimeContext) (IAsterixAppRuntimeContext) ctx.getJobletContext()
- .getApplicationContext().getApplicationObject()).getFeedManager();
- IAsterixAppRuntimeContext runtimeCtx = (IAsterixAppRuntimeContext) ctx.getJobletContext()
- .getApplicationContext().getApplicationObject();
- this.feedManager = runtimeCtx.getFeedManager();
- this.operandId = operationId;
- }
-
- @Override
- public void open() throws HyracksDataException {
- FeedRuntimeId runtimeId = new FeedRuntimeId(runtimeType, partition, operandId);
- try {
- feedRuntime = feedManager.getFeedConnectionManager().getFeedRuntime(connectionId, runtimeId);
- if (feedRuntime == null) {
- initializeNewFeedRuntime(runtimeId);
- } else {
- reviveOldFeedRuntime(runtimeId);
- }
-
- coreOperator.open();
- } catch (Exception e) {
- e.printStackTrace();
- throw new HyracksDataException(e);
- }
- }
-
- private void initializeNewFeedRuntime(FeedRuntimeId runtimeId) throws Exception {
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning("Runtime not found for " + runtimeId + " connection id " + connectionId);
- }
- this.fta = new FrameTupleAccessor(recordDesc);
- this.inputSideHandler = new FeedRuntimeInputHandler(ctx, connectionId, runtimeId, coreOperator,
- policyEnforcer.getFeedPolicyAccessor(), true, fta, recordDesc, feedManager,
- nPartitions);
- if(coreOperator instanceof AsterixLSMInsertDeleteOperatorNodePushable){
- AsterixLSMInsertDeleteOperatorNodePushable indexOp = (AsterixLSMInsertDeleteOperatorNodePushable) coreOperator;
- if(!indexOp.isPrimary()){
- inputSideHandler.setBufferingEnabled(false);
- }
- }
- setupBasicRuntime(inputSideHandler);
- }
-
- private void reviveOldFeedRuntime(FeedRuntimeId runtimeId) throws Exception {
- this.inputSideHandler = feedRuntime.getInputHandler();
- this.fta = new FrameTupleAccessor(recordDesc);
- coreOperator.setOutputFrameWriter(0, writer, recordDesc);
- this.inputSideHandler.reset(nPartitions);
- this.inputSideHandler.setCoreOperator(coreOperator);
- feedRuntime.setMode(Mode.PROCESS);
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning("Retreived state from the zombie instance from previous execution for " + runtimeType
- + " node.");
- }
- }
-
- private void setupBasicRuntime(FeedRuntimeInputHandler inputHandler) throws Exception {
- coreOperator.setOutputFrameWriter(0, writer, recordDesc);
- FeedRuntimeId runtimeId = new FeedRuntimeId(runtimeType, partition, operandId);
- feedRuntime = new FeedRuntime(runtimeId, inputHandler, writer);
- feedManager.getFeedConnectionManager().registerFeedRuntime(connectionId, (FeedRuntime) feedRuntime);
- }
-
- @Override
- public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
- try {
- inputSideHandler.nextFrame(buffer);
- } catch (Exception e) {
- e.printStackTrace();
- throw new HyracksDataException(e);
- }
- }
-
- @Override
- public void fail() throws HyracksDataException {
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.info("Core Op:" + coreOperator.getDisplayName() + " fail ");
- }
- feedRuntime.setMode(Mode.FAIL);
- coreOperator.fail();
- }
-
- @Override
- public void close() throws HyracksDataException {
- System.out.println("CLOSE CALLED FOR " + this.feedRuntime.getRuntimeId());
- boolean stalled = inputSideHandler.getMode().equals(Mode.STALL);
- try {
- if (!stalled) {
- System.out.println("SIGNALLING END OF DATA for " + this.feedRuntime.getRuntimeId() + " mode is "
- + inputSideHandler.getMode() + " WAITING ON " + coreOperator);
- inputSideHandler.nextFrame(null); // signal end of data
- while (!inputSideHandler.isFinished()) {
- synchronized (coreOperator) {
- coreOperator.wait();
- }
- }
- System.out.println("ABOUT TO CLOSE OPERATOR " + coreOperator);
- }
- coreOperator.close();
- } catch (Exception e) {
- e.printStackTrace();
- // ignore
- } finally {
- if (!stalled) {
- deregister();
- System.out.println("DEREGISTERING " + this.feedRuntime.getRuntimeId());
- } else {
- System.out.println("NOT DEREGISTERING " + this.feedRuntime.getRuntimeId());
- }
- inputSideHandler.close();
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Ending Operator " + this.feedRuntime.getRuntimeId());
- }
- }
- }
-
- private void deregister() {
- if (feedRuntime != null) {
- feedManager.getFeedConnectionManager().deRegisterFeedRuntime(connectionId,
- ((FeedRuntime) feedRuntime).getRuntimeId());
- }
- }
-
-}
\ No newline at end of file