You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by am...@apache.org on 2016/05/15 19:04:02 UTC
[9/9] incubator-asterixdb git commit: Cleanup Feed CodeBase
Cleanup Feed CodeBase
In order to expedite cleaning feeds up and reaching a maintainable
state, we disabled the following:
1. Policies (At least once, throttling, discarding, elasticity).
2. Statistics Reporting.
3. Load management.
4. Feed re-activation upon System reboot.
Right now on master, none of these features work reliably.
We will re-introduce them one feature at a time.
The rules followed in this change:
1. Keep X if X is tested in a test case.
2. Remove X if X is not used in test cases.
After a few meetings with Mike and Till, the policies
1. Buffering
2. Spill
3. Discard
4. Throttle
have been fixed and unit tests have been added.
Change-Id: I545bc4f8560564e4c868a80d27c77a4edd97a8b8
Reviewed-on: https://asterix-gerrit.ics.uci.edu/798
Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
Reviewed-by: abdullah alamoudi <ba...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/commit/fba622b3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/tree/fba622b3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/diff/fba622b3
Branch: refs/heads/master
Commit: fba622b3ae6e5850e9110212e66c410ce4f00359
Parents: 0716dc0
Author: Abdullah Alamoudi <ba...@gmail.com>
Authored: Sat May 14 13:01:16 2016 -0700
Committer: abdullah alamoudi <ba...@gmail.com>
Committed: Sun May 15 12:03:00 2016 -0700
----------------------------------------------------------------------
.../api/common/AsterixAppRuntimeContext.java | 8 +-
.../api/http/servlet/ConnectorAPIServlet.java | 3 +-
.../asterix/api/http/servlet/FeedServlet.java | 39 +-
.../api/http/servlet/FeedServletUtil.java | 76 ---
.../app/external/CentralFeedManager.java | 110 ----
.../external/FeedJobNotificationHandler.java | 84 ---
.../app/external/FeedLifecycleListener.java | 282 ---------
.../asterix/app/external/FeedLoadManager.java | 300 ---------
.../app/external/FeedMessageReceiver.java | 96 ---
.../asterix/app/external/FeedOperations.java | 98 ---
.../app/external/FeedTrackingManager.java | 187 ------
.../app/external/FeedWorkCollection.java | 93 +--
.../FeedWorkRequestResponseHandler.java | 269 ---------
.../asterix/app/external/FeedsActivator.java | 118 ----
.../asterix/aql/translator/QueryTranslator.java | 12 +-
.../bootstrap/CCApplicationEntryPoint.java | 7 -
.../bootstrap/ClusterLifecycleListener.java | 8 +-
.../hyracks/bootstrap/FeedBootstrap.java | 62 --
.../bootstrap/GlobalRecoveryManager.java | 9 +-
.../http/servlet/ConnectorAPIServletTest.java | 3 +-
.../src/test/resources/runtimets/testsuite.xml | 44 +-
.../common/exceptions/IExceptionHandler.java | 43 ++
asterixdb/asterix-external-data/pom.xml | 41 +-
.../external/api/IAdapterRuntimeManager.java | 84 ---
.../dataflow/FeedRecordDataFlowController.java | 8 +-
.../external/dataflow/FeedTupleForwarder.java | 29 +-
.../external/feed/api/ICentralFeedManager.java | 34 --
.../external/feed/api/IExceptionHandler.java | 43 --
.../feed/api/IFeedConnectionManager.java | 2 +-
.../external/feed/api/IFeedFrameHandler.java | 39 --
.../IFeedLifecycleIntakeEventSubscriber.java | 28 -
.../feed/api/IFeedLifecycleListener.java | 3 +-
.../external/feed/api/IFeedLoadManager.java | 60 --
.../asterix/external/feed/api/IFeedManager.java | 72 ---
.../external/feed/api/IFeedMemoryComponent.java | 58 --
.../external/feed/api/IFeedMemoryManager.java | 58 --
.../asterix/external/feed/api/IFeedMessage.java | 13 +-
.../external/feed/api/IFeedMessageService.java | 34 --
.../external/feed/api/IFeedMetadataManager.java | 39 --
.../external/feed/api/IFeedMetricCollector.java | 50 --
.../api/IFeedOperatorOutputSideHandler.java | 49 --
.../external/feed/api/IFeedProvider.java | 26 -
.../asterix/external/feed/api/IFeedRuntime.java | 23 +-
.../asterix/external/feed/api/IFeedService.java | 26 -
.../feed/api/IFeedSubscriptionManager.java | 41 --
.../external/feed/api/IFeedTrackingManager.java | 29 -
.../external/feed/api/IFrameEventCallback.java | 32 -
.../external/feed/api/IFramePostProcessor.java | 28 -
.../external/feed/api/IFramePreprocessor.java | 26 -
.../feed/api/IIntakeProgressTracker.java | 29 -
.../external/feed/api/IMessageReceiver.java | 28 -
.../external/feed/api/ISubscribableRuntime.java | 28 +-
.../external/feed/api/ISubscriberRuntime.java | 3 -
.../feed/api/ISubscriptionProvider.java | 29 -
.../CollectTransformFeedFrameWriter.java | 125 ----
.../external/feed/dataflow/DataBucket.java | 89 ---
.../external/feed/dataflow/DataBucketPool.java | 110 ----
.../dataflow/DistributeFeedFrameWriter.java | 64 +-
.../FeedCollectRuntimeInputHandler.java | 57 --
.../feed/dataflow/FeedExceptionHandler.java | 25 +-
.../external/feed/dataflow/FeedFrameCache.java | 177 ------
.../feed/dataflow/FeedFrameCollector.java | 96 +--
.../feed/dataflow/FeedFrameDiscarder.java | 67 --
.../feed/dataflow/FeedFrameHandlers.java | 309 ----------
.../feed/dataflow/FeedFrameSpiller.java | 188 ------
.../feed/dataflow/FeedFrameTupleAccessor.java | 110 ----
.../feed/dataflow/FeedFrameTupleDecorator.java | 108 ----
.../feed/dataflow/FeedRuntimeInputHandler.java | 605 ++++++++-----------
.../external/feed/dataflow/FrameAction.java | 54 ++
.../external/feed/dataflow/FrameCollection.java | 101 ----
.../feed/dataflow/FrameDistributor.java | 422 ++++---------
.../feed/dataflow/FrameEventCallback.java | 103 ----
.../external/feed/dataflow/FrameSpiller.java | 217 +++++++
.../feed/dataflow/StorageFrameHandler.java | 119 ----
.../dataflow/SyncFeedRuntimeInputHandler.java | 72 +++
.../feed/management/ConcurrentFramePool.java | 204 +++++++
.../feed/management/FeedConnectionId.java | 9 +-
.../feed/management/FeedConnectionManager.java | 4 +-
.../external/feed/management/FeedManager.java | 84 +--
.../feed/management/FeedMemoryManager.java | 114 ----
.../feed/management/FeedMetadataManager.java | 112 ----
.../management/FeedSubscriptionManager.java | 76 ---
.../feed/message/FeedCongestionMessage.java | 102 ----
.../feed/message/FeedMessageService.java | 145 -----
.../feed/message/FeedReportMessage.java | 99 ---
.../feed/message/FeedTupleCommitAckMessage.java | 98 ---
.../message/FeedTupleCommitResponseMessage.java | 81 ---
.../external/feed/message/MessageListener.java | 126 ----
.../external/feed/message/MessageReceiver.java | 119 ----
.../feed/message/NodeReportMessage.java | 68 ---
.../feed/message/PrepareStallMessage.java | 68 ---
.../message/RemoteSocketMessageListener.java | 134 ----
.../feed/message/ScaleInReportMessage.java | 113 ----
.../feed/message/SocketMessageListener.java | 160 -----
.../feed/message/StorageReportFeedMessage.java | 128 ----
.../feed/message/TerminateDataFlowMessage.java | 52 --
.../message/ThrottlingEnabledFeedMessage.java | 85 ---
.../external/feed/message/XAQLFeedMessage.java | 66 --
.../external/feed/runtime/AdapterExecutor.java | 11 +-
.../feed/runtime/AdapterRuntimeManager.java | 63 +-
.../feed/runtime/CollectionRuntime.java | 26 +-
.../external/feed/runtime/FeedRuntime.java | 38 +-
.../external/feed/runtime/FeedRuntimeId.java | 28 +-
.../external/feed/runtime/IngestionRuntime.java | 60 +-
.../feed/runtime/SubscribableFeedRuntimeId.java | 53 --
.../feed/runtime/SubscribableRuntime.java | 45 +-
.../feed/watch/BasicMonitoredBuffer.java | 80 ---
.../feed/watch/ComputeSideMonitoredBuffer.java | 79 ---
.../feed/watch/FeedMetricCollector.java | 189 ------
.../feed/watch/IntakePartitionStatistics.java | 41 --
.../feed/watch/IntakeSideMonitoredBuffer.java | 80 ---
.../external/feed/watch/MonitoredBuffer.java | 401 ------------
.../feed/watch/MonitoredBufferTimerTasks.java | 299 ---------
.../asterix/external/feed/watch/NodeLoad.java | 62 --
.../external/feed/watch/NodeLoadReport.java | 100 ---
.../feed/watch/NodeLoadReportService.java | 107 ----
.../asterix/external/feed/watch/Series.java | 44 --
.../asterix/external/feed/watch/SeriesAvg.java | 47 --
.../asterix/external/feed/watch/SeriesRate.java | 92 ---
.../feed/watch/StorageSideMonitoredBuffer.java | 209 -------
.../input/stream/SocketServerInputStream.java | 4 +-
.../FeedCollectOperatorDescriptor.java | 38 +-
.../FeedCollectOperatorNodePushable.java | 160 +----
.../operators/FeedIntakeOperatorDescriptor.java | 23 +-
.../FeedIntakeOperatorNodePushable.java | 200 ++----
.../FeedMessageOperatorNodePushable.java | 187 +-----
.../operators/FeedMetaComputeNodePushable.java | 134 ++--
.../operators/FeedMetaNodePushable.java | 189 ------
.../operators/FeedMetaOperatorDescriptor.java | 11 -
.../operators/FeedMetaStoreNodePushable.java | 134 ++--
.../util/ExternalDataExceptionUtils.java | 18 +-
.../feed/test/FeedMemoryManagerUnitTest.java | 482 +++++++++++++++
.../external/feed/test/FeedSpillerUnitTest.java | 178 ++++++
.../metadata/declared/AqlMetadataProvider.java | 358 +++++------
.../metadata/feeds/FeedMetadataUtil.java | 75 +--
.../apache/hyracks/api/comm/IFrameWriter.java | 25 +-
.../api/context/IHyracksFrameMgrContext.java | 15 +-
.../api/context/IHyracksTaskContext.java | 2 +
.../org/apache/hyracks/control/nc/Task.java | 5 +
.../common/io/MessagingFrameTupleAppender.java | 17 +-
.../std/connectors/PartitionDataWriter.java | 8 +-
.../hyracks/test/support/TestTaskContext.java | 7 +-
142 files changed, 2318 insertions(+), 10614 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContext.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContext.java
index 8342be5..5b3e453 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContext.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContext.java
@@ -51,7 +51,6 @@ import org.apache.asterix.common.transactions.IAsterixAppRuntimeContextProvider;
import org.apache.asterix.common.transactions.IRecoveryManager;
import org.apache.asterix.common.transactions.IRecoveryManager.SystemState;
import org.apache.asterix.common.transactions.ITransactionSubsystem;
-import org.apache.asterix.external.feed.api.IFeedManager;
import org.apache.asterix.external.feed.management.FeedManager;
import org.apache.asterix.metadata.MetadataManager;
import org.apache.asterix.metadata.MetadataNode;
@@ -120,7 +119,7 @@ public class AsterixAppRuntimeContext implements IAsterixAppRuntimeContext, IAst
private IIOManager ioManager;
private boolean isShuttingdown;
- private IFeedManager feedManager;
+ private FeedManager feedManager;
private IReplicationChannel replicationChannel;
private IReplicationManager replicationManager;
@@ -173,7 +172,8 @@ public class AsterixAppRuntimeContext implements IAsterixAppRuntimeContext, IAst
ILocalResourceRepositoryFactory persistentLocalResourceRepositoryFactory = new PersistentLocalResourceRepositoryFactory(
ioManager, ncApplicationContext.getNodeId(), metadataProperties);
- localResourceRepository = (PersistentLocalResourceRepository) persistentLocalResourceRepositoryFactory.createRepository();
+ localResourceRepository = (PersistentLocalResourceRepository) persistentLocalResourceRepositoryFactory
+ .createRepository();
IAsterixAppRuntimeContextProvider asterixAppRuntimeContextProvider = new AsterixAppRuntimeContextProviderForRecovery(
this);
@@ -377,7 +377,7 @@ public class AsterixAppRuntimeContext implements IAsterixAppRuntimeContext, IAst
}
@Override
- public IFeedManager getFeedManager() {
+ public FeedManager getFeedManager() {
return feedManager;
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/ConnectorAPIServlet.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/ConnectorAPIServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/ConnectorAPIServlet.java
index 79ce721..c70950a 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/ConnectorAPIServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/ConnectorAPIServlet.java
@@ -29,7 +29,6 @@ import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
-import org.apache.asterix.app.external.CentralFeedManager;
import org.apache.asterix.metadata.MetadataManager;
import org.apache.asterix.metadata.MetadataTransactionContext;
import org.apache.asterix.metadata.declared.AqlMetadataProvider;
@@ -84,7 +83,7 @@ public class ConnectorAPIServlet extends HttpServlet {
MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
// Retrieves file splits of the dataset.
- AqlMetadataProvider metadataProvider = new AqlMetadataProvider(null, CentralFeedManager.getInstance());
+ AqlMetadataProvider metadataProvider = new AqlMetadataProvider(null);
metadataProvider.setMetadataTxnContext(mdTxnCtx);
Dataset dataset = metadataProvider.findDataset(dataverseName, datasetName);
if (dataset == null) {
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/FeedServlet.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/FeedServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/FeedServlet.java
index eacee6d..8bc613a 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/FeedServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/FeedServlet.java
@@ -32,9 +32,6 @@ import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
-import org.apache.asterix.app.external.CentralFeedManager;
-import org.apache.asterix.external.feed.api.IFeedLoadManager;
-import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
import org.apache.asterix.external.feed.management.FeedConnectionId;
import org.apache.asterix.external.feed.management.FeedId;
import org.apache.asterix.external.feed.watch.FeedActivity;
@@ -86,52 +83,26 @@ public class FeedServlet extends HttpServlet {
}
String outStr = null;
- if (requestURI.startsWith("/webui/static")) {
- outStr = sb.toString();
- } else {
- Collection<FeedActivity> lfa = CentralFeedManager.getInstance().getFeedLoadManager().getFeedActivities();
- StringBuilder ldStr = new StringBuilder();
- ldStr.append("<br />");
- ldStr.append("<br />");
- if (lfa == null || lfa.isEmpty()) {
- ldStr.append("Currently there are no active feeds in AsterixDB");
- } else {
- ldStr.append("Active Feeds");
- }
- insertTable(ldStr, lfa);
- outStr = String.format(sb.toString(), ldStr.toString());
-
- }
+ outStr = sb.toString();
PrintWriter out = response.getWriter();
out.println(outStr);
}
+ @SuppressWarnings("unused")
private void insertTable(StringBuilder html, Collection<FeedActivity> list) {
- html.append("<table style=\"width:100%\">");
- html.append("<th>" + FeedServletUtil.Constants.TABLE_HEADER_FEED_NAME + "</th>");
- html.append("<th>" + FeedServletUtil.Constants.TABLE_HEADER_DATASET_NAME + "</th>");
- html.append("<th>" + FeedServletUtil.Constants.TABLE_HEADER_ACTIVE_SINCE + "</th>");
- html.append("<th>" + FeedServletUtil.Constants.TABLE_HEADER_INTAKE_STAGE + "</th>");
- html.append("<th>" + FeedServletUtil.Constants.TABLE_HEADER_COMPUTE_STAGE + "</th>");
- html.append("<th>" + FeedServletUtil.Constants.TABLE_HEADER_STORE_STAGE + "</th>");
- html.append("<th>" + FeedServletUtil.Constants.TABLE_HEADER_INTAKE_RATE + "</th>");
- html.append("<th>" + FeedServletUtil.Constants.TABLE_HEADER_STORE_RATE + "</th>");
- for (FeedActivity activity : list) {
- insertRow(html, activity);
- }
}
+ @SuppressWarnings("null")
private void insertRow(StringBuilder html, FeedActivity activity) {
String intake = activity.getFeedActivityDetails().get(FeedActivityDetails.INTAKE_LOCATIONS);
String compute = activity.getFeedActivityDetails().get(FeedActivityDetails.COMPUTE_LOCATIONS);
String store = activity.getFeedActivityDetails().get(FeedActivityDetails.STORAGE_LOCATIONS);
- IFeedLoadManager loadManager = CentralFeedManager.getInstance().getFeedLoadManager();
FeedConnectionId connectionId = new FeedConnectionId(
new FeedId(activity.getDataverseName(), activity.getFeedName()), activity.getDatasetName());
- int intakeRate = loadManager.getOutflowRate(connectionId, FeedRuntimeType.COLLECT) * intake.split(",").length;
- int storeRate = loadManager.getOutflowRate(connectionId, FeedRuntimeType.STORE) * store.split(",").length;
+ int intakeRate = 0;
+ int storeRate = 0;
html.append("<tr>");
html.append("<td>" + activity.getFeedName() + "</td>");
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/FeedServletUtil.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/FeedServletUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/FeedServletUtil.java
deleted file mode 100644
index 52a140d..0000000
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/FeedServletUtil.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.api.http.servlet;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.Socket;
-import java.nio.CharBuffer;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.app.external.FeedLifecycleListener;
-import org.apache.asterix.external.feed.management.FeedConnectionId;
-import org.apache.asterix.external.feed.message.RemoteSocketMessageListener;
-
-public class FeedServletUtil {
-
- private static final Logger LOGGER = Logger.getLogger(FeedServletUtil.class.getName());
- private static final char EOL = (char) "\n".getBytes()[0];
-
- public static final class Constants {
- public static final String TABLE_HEADER_FEED_NAME = "Feed";
- public static final String TABLE_HEADER_DATASET_NAME = "Dataset";
- public static final String TABLE_HEADER_ACTIVE_SINCE = "Timestamp";
- public static final String TABLE_HEADER_INTAKE_STAGE = "Intake Stage";
- public static final String TABLE_HEADER_COMPUTE_STAGE = "Compute Stage";
- public static final String TABLE_HEADER_STORE_STAGE = "Store Stage";
- public static final String TABLE_HEADER_INTAKE_RATE = "Intake";
- public static final String TABLE_HEADER_STORE_RATE = "Store";
- }
-
- public static void initiateSubscription(FeedConnectionId feedId, String host, int port) throws IOException {
- LinkedBlockingQueue<String> outbox = new LinkedBlockingQueue<String>();
- int subscriptionPort = port + 1;
- Socket sc = new Socket(host, subscriptionPort);
- InputStream in = sc.getInputStream();
-
- CharBuffer buffer = CharBuffer.allocate(50);
- char ch = 0;
- while (ch != EOL) {
- buffer.put(ch);
- ch = (char) in.read();
- }
- buffer.flip();
- sc.close();
-
- String s = new String(buffer.array());
- int feedSubscriptionPort = Integer.parseInt(s.trim());
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Response from Super Feed Manager Report Service " + port + " will connect at " + host + " "
- + port);
- }
-
- // register the feed subscription queue with FeedLifecycleListener
- FeedLifecycleListener.INSTANCE.registerFeedReportQueue(feedId, outbox);
- RemoteSocketMessageListener listener = new RemoteSocketMessageListener(host, feedSubscriptionPort, outbox);
- listener.start();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/CentralFeedManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/CentralFeedManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/CentralFeedManager.java
deleted file mode 100644
index cab5e64..0000000
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/CentralFeedManager.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.app.external;
-
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.io.StringReader;
-import java.util.List;
-
-import org.apache.asterix.api.common.SessionConfig;
-import org.apache.asterix.api.common.SessionConfig.OutputFormat;
-import org.apache.asterix.aql.translator.QueryTranslator;
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.compiler.provider.AqlCompilationProvider;
-import org.apache.asterix.compiler.provider.ILangCompilationProvider;
-import org.apache.asterix.external.feed.api.ICentralFeedManager;
-import org.apache.asterix.external.feed.api.IFeedLoadManager;
-import org.apache.asterix.external.feed.api.IFeedTrackingManager;
-import org.apache.asterix.external.feed.message.SocketMessageListener;
-import org.apache.asterix.lang.aql.parser.AQLParserFactory;
-import org.apache.asterix.lang.common.base.IParser;
-import org.apache.asterix.lang.common.base.IParserFactory;
-import org.apache.asterix.lang.common.base.Statement;
-import org.apache.asterix.om.util.AsterixAppContextInfo;
-import org.apache.hyracks.api.client.IHyracksClientConnection;
-import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.api.job.JobSpecification;
-
-public class CentralFeedManager implements ICentralFeedManager {
-
- private static final ICentralFeedManager centralFeedManager = new CentralFeedManager();
- private static final ILangCompilationProvider compilationProvider = new AqlCompilationProvider();
-
- public static ICentralFeedManager getInstance() {
- return centralFeedManager;
- }
-
- private final int port;
- private final IFeedLoadManager feedLoadManager;
- private final IFeedTrackingManager feedTrackingManager;
- private final SocketMessageListener messageListener;
-
- private CentralFeedManager() {
- this.port = AsterixAppContextInfo.getInstance().getFeedProperties().getFeedCentralManagerPort();
- this.feedLoadManager = new FeedLoadManager();
- this.feedTrackingManager = new FeedTrackingManager();
- this.messageListener = new SocketMessageListener(port, new FeedMessageReceiver(this));
- }
-
- @Override
- public void start() throws AsterixException {
- messageListener.start();
- }
-
- @Override
- public void stop() throws AsterixException, IOException {
- messageListener.stop();
- }
-
- public static JobId runJob(JobSpecification spec, boolean waitForCompletion) throws Exception {
- IHyracksClientConnection hcc = AsterixAppContextInfo.getInstance().getHcc();
- JobId jobId = hcc.startJob(spec);
- if (waitForCompletion) {
- hcc.waitForCompletion(jobId);
- }
- return jobId;
- }
-
- @Override
- public IFeedLoadManager getFeedLoadManager() {
- return feedLoadManager;
- }
-
- @Override
- public IFeedTrackingManager getFeedTrackingManager() {
- return feedTrackingManager;
- }
-
- public static class AQLExecutor {
-
- private static final PrintWriter out = new PrintWriter(System.out, true);
- private static final IParserFactory parserFactory = new AQLParserFactory();
-
- public static void executeAQL(String aql) throws Exception {
- IParser parser = parserFactory.createParser(new StringReader(aql));
- List<Statement> statements = parser.parse();
- SessionConfig pc = new SessionConfig(out, OutputFormat.ADM);
- QueryTranslator translator = new QueryTranslator(statements, pc, compilationProvider);
- translator.compileAndExecute(AsterixAppContextInfo.getInstance().getHcc(), null,
- QueryTranslator.ResultDelivery.SYNC);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedJobNotificationHandler.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedJobNotificationHandler.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedJobNotificationHandler.java
index a143578..cfc2125 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedJobNotificationHandler.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedJobNotificationHandler.java
@@ -21,7 +21,6 @@ package org.apache.asterix.app.external;
import java.rmi.RemoteException;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -40,15 +39,12 @@ import org.apache.asterix.external.feed.api.IFeedJoint;
import org.apache.asterix.external.feed.api.IFeedJoint.State;
import org.apache.asterix.external.feed.api.IFeedLifecycleEventSubscriber;
import org.apache.asterix.external.feed.api.IFeedLifecycleEventSubscriber.FeedLifecycleEvent;
-import org.apache.asterix.external.feed.api.IIntakeProgressTracker;
import org.apache.asterix.external.feed.management.FeedConnectionId;
import org.apache.asterix.external.feed.management.FeedConnectionRequest;
import org.apache.asterix.external.feed.management.FeedId;
import org.apache.asterix.external.feed.management.FeedJointKey;
import org.apache.asterix.external.feed.management.FeedWorkManager;
-import org.apache.asterix.external.feed.message.StorageReportFeedMessage;
import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
-import org.apache.asterix.external.feed.watch.FeedActivity;
import org.apache.asterix.external.feed.watch.FeedConnectJobInfo;
import org.apache.asterix.external.feed.watch.FeedIntakeInfo;
import org.apache.asterix.external.feed.watch.FeedJobInfo;
@@ -57,9 +53,7 @@ import org.apache.asterix.external.feed.watch.FeedJobInfo.JobType;
import org.apache.asterix.external.operators.FeedCollectOperatorDescriptor;
import org.apache.asterix.external.operators.FeedIntakeOperatorDescriptor;
import org.apache.asterix.external.operators.FeedMetaOperatorDescriptor;
-import org.apache.asterix.metadata.feeds.BuiltinFeedPolicies;
import org.apache.asterix.om.util.AsterixAppContextInfo;
-import org.apache.commons.lang3.StringUtils;
import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
import org.apache.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
@@ -85,7 +79,6 @@ public class FeedJobNotificationHandler implements Runnable {
private final Map<FeedId, FeedIntakeInfo> intakeJobInfos;
private final Map<FeedConnectionId, FeedConnectJobInfo> connectJobInfos;
private final Map<FeedId, Pair<FeedOperationCounter, List<IFeedJoint>>> feedPipeline;
- private final Map<FeedConnectionId, Pair<IIntakeProgressTracker, Long>> feedIntakeProgressTrackers;
public FeedJobNotificationHandler(LinkedBlockingQueue<FeedEvent> inbox) {
this.inbox = inbox;
@@ -94,7 +87,6 @@ public class FeedJobNotificationHandler implements Runnable {
this.connectJobInfos = new HashMap<FeedConnectionId, FeedConnectJobInfo>();
this.feedPipeline = new HashMap<FeedId, Pair<FeedOperationCounter, List<IFeedJoint>>>();
this.eventSubscribers = new HashMap<FeedConnectionId, List<IFeedLifecycleEventSubscriber>>();
- this.feedIntakeProgressTrackers = new HashMap<FeedConnectionId, Pair<IIntakeProgressTracker, Long>>();
}
@Override
@@ -124,29 +116,6 @@ public class FeedJobNotificationHandler implements Runnable {
}
}
- public void registerFeedIntakeProgressTracker(FeedConnectionId connectionId,
- IIntakeProgressTracker feedIntakeProgressTracker) {
- if (feedIntakeProgressTrackers.get(connectionId) == null) {
- this.feedIntakeProgressTrackers.put(connectionId,
- new Pair<IIntakeProgressTracker, Long>(feedIntakeProgressTracker, 0L));
- } else {
- throw new IllegalStateException(
- " Progress tracker for connection " + connectionId + " is alreader registered");
- }
- }
-
- public void deregisterFeedIntakeProgressTracker(FeedConnectionId connectionId) {
- this.feedIntakeProgressTrackers.remove(connectionId);
- }
-
- public void updateTrackingInformation(StorageReportFeedMessage srm) {
- Pair<IIntakeProgressTracker, Long> p = feedIntakeProgressTrackers.get(srm.getConnectionId());
- if (p != null && p.second < srm.getLastPersistedTupleIntakeTimestamp()) {
- p.second = srm.getLastPersistedTupleIntakeTimestamp();
- p.first.notifyIngestedTupleTimestamp(p.second);
- }
- }
-
public Collection<FeedIntakeInfo> getFeedIntakeInfos() {
return intakeJobInfos.values();
}
@@ -358,8 +327,6 @@ public class FeedJobNotificationHandler implements Runnable {
}
}
cInfo.setState(FeedJobState.ACTIVE);
- // register activity in metadata
- registerFeedActivity(cInfo);
}
private void notifyFeedEventSubscribers(FeedJobInfo jobInfo, FeedLifecycleEvent event) {
@@ -489,63 +456,12 @@ public class FeedJobNotificationHandler implements Runnable {
connectJobInfos.remove(connectionId);
jobInfos.remove(cInfo.getJobId());
- feedIntakeProgressTrackers.remove(cInfo.getConnectionId());
- deregisterFeedActivity(cInfo);
// notify event listeners
FeedLifecycleEvent event = failure ? FeedLifecycleEvent.FEED_COLLECT_FAILURE
: FeedLifecycleEvent.FEED_COLLECT_ENDED;
notifyFeedEventSubscribers(cInfo, event);
}
- private void registerFeedActivity(FeedConnectJobInfo cInfo) {
- Map<String, String> feedActivityDetails = new HashMap<String, String>();
-
- if (cInfo.getCollectLocations() != null) {
- feedActivityDetails.put(FeedActivity.FeedActivityDetails.INTAKE_LOCATIONS,
- StringUtils.join(cInfo.getCollectLocations().iterator(), ','));
- }
-
- if (cInfo.getComputeLocations() != null) {
- feedActivityDetails.put(FeedActivity.FeedActivityDetails.COMPUTE_LOCATIONS,
- StringUtils.join(cInfo.getComputeLocations().iterator(), ','));
- }
-
- if (cInfo.getStorageLocations() != null) {
- feedActivityDetails.put(FeedActivity.FeedActivityDetails.STORAGE_LOCATIONS,
- StringUtils.join(cInfo.getStorageLocations().iterator(), ','));
- }
-
- String policyName = cInfo.getFeedPolicy().get(BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY);
- feedActivityDetails.put(FeedActivity.FeedActivityDetails.FEED_POLICY_NAME, policyName);
-
- feedActivityDetails.put(FeedActivity.FeedActivityDetails.FEED_CONNECT_TIMESTAMP, (new Date()).toString());
- try {
- FeedActivity feedActivity = new FeedActivity(cInfo.getConnectionId().getFeedId().getDataverse(),
- cInfo.getConnectionId().getFeedId().getFeedName(), cInfo.getConnectionId().getDatasetName(),
- feedActivityDetails);
- CentralFeedManager.getInstance().getFeedLoadManager().reportFeedActivity(cInfo.getConnectionId(),
- feedActivity);
-
- } catch (Exception e) {
- e.printStackTrace();
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning("Unable to register feed activity for " + cInfo + " " + e.getMessage());
- }
-
- }
-
- }
-
- public void deregisterFeedActivity(FeedConnectJobInfo cInfo) {
- try {
- CentralFeedManager.getInstance().getFeedLoadManager().removeFeedActivity(cInfo.getConnectionId());
- } catch (Exception e) {
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning("Unable to deregister feed activity for " + cInfo + " " + e.getMessage());
- }
- }
- }
-
public boolean isRegisteredFeedJob(JobId jobId) {
return jobInfos.get(jobId) != null;
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedLifecycleListener.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedLifecycleListener.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedLifecycleListener.java
index 161c863..b8435af 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedLifecycleListener.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedLifecycleListener.java
@@ -18,56 +18,27 @@
*/
package org.apache.asterix.app.external;
-import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;
-import org.apache.asterix.api.common.SessionConfig;
-import org.apache.asterix.api.common.SessionConfig.OutputFormat;
-import org.apache.asterix.aql.translator.QueryTranslator;
-import org.apache.asterix.common.api.IClusterManagementWork;
-import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
-import org.apache.asterix.common.api.IClusterManagementWorkResponse;
-import org.apache.asterix.compiler.provider.AqlCompilationProvider;
-import org.apache.asterix.compiler.provider.ILangCompilationProvider;
import org.apache.asterix.external.feed.api.IFeedJoint;
import org.apache.asterix.external.feed.api.IFeedLifecycleEventSubscriber;
import org.apache.asterix.external.feed.api.IFeedLifecycleListener;
-import org.apache.asterix.external.feed.api.IIntakeProgressTracker;
-import org.apache.asterix.external.feed.management.FeedCollectInfo;
import org.apache.asterix.external.feed.management.FeedConnectionId;
import org.apache.asterix.external.feed.management.FeedConnectionRequest;
import org.apache.asterix.external.feed.management.FeedId;
import org.apache.asterix.external.feed.management.FeedJointKey;
-import org.apache.asterix.external.feed.message.StorageReportFeedMessage;
import org.apache.asterix.external.feed.watch.FeedConnectJobInfo;
-import org.apache.asterix.external.feed.watch.FeedIntakeInfo;
-import org.apache.asterix.external.feed.watch.FeedJobInfo;
import org.apache.asterix.external.feed.watch.FeedJobInfo.FeedJobState;
import org.apache.asterix.external.operators.FeedCollectOperatorDescriptor;
import org.apache.asterix.external.operators.FeedIntakeOperatorDescriptor;
-import org.apache.asterix.lang.common.base.Statement;
-import org.apache.asterix.lang.common.statement.DataverseDecl;
-import org.apache.asterix.lang.common.statement.DisconnectFeedStatement;
-import org.apache.asterix.lang.common.struct.Identifier;
-import org.apache.asterix.metadata.MetadataManager;
-import org.apache.asterix.metadata.MetadataTransactionContext;
-import org.apache.asterix.metadata.cluster.AddNodeWork;
-import org.apache.asterix.metadata.cluster.ClusterManager;
-import org.apache.asterix.om.util.AsterixAppContextInfo;
-import org.apache.asterix.om.util.AsterixClusterProperties;
-import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
import org.apache.hyracks.api.exceptions.HyracksException;
import org.apache.hyracks.api.job.IActivityClusterGraphGeneratorFactory;
@@ -84,31 +55,17 @@ import org.apache.hyracks.api.job.JobSpecification;
public class FeedLifecycleListener implements IFeedLifecycleListener {
private static final Logger LOGGER = Logger.getLogger(FeedLifecycleListener.class.getName());
-
public static FeedLifecycleListener INSTANCE = new FeedLifecycleListener();
- private static final ILangCompilationProvider compilationProvider = new AqlCompilationProvider();
private final LinkedBlockingQueue<FeedEvent> jobEventInbox;
- private final LinkedBlockingQueue<IClusterManagementWorkResponse> responseInbox;
- private final Map<FeedCollectInfo, List<String>> dependentFeeds = new HashMap<FeedCollectInfo, List<String>>();
- private final Map<FeedConnectionId, LinkedBlockingQueue<String>> feedReportQueue;
private final FeedJobNotificationHandler feedJobNotificationHandler;
- private final FeedWorkRequestResponseHandler feedWorkRequestResponseHandler;
private final ExecutorService executorService;
- private ClusterState state;
-
private FeedLifecycleListener() {
this.jobEventInbox = new LinkedBlockingQueue<FeedEvent>();
this.feedJobNotificationHandler = new FeedJobNotificationHandler(jobEventInbox);
- this.responseInbox = new LinkedBlockingQueue<IClusterManagementWorkResponse>();
- this.feedWorkRequestResponseHandler = new FeedWorkRequestResponseHandler(responseInbox);
- this.feedReportQueue = new HashMap<FeedConnectionId, LinkedBlockingQueue<String>>();
this.executorService = Executors.newCachedThreadPool();
this.executorService.execute(feedJobNotificationHandler);
- this.executorService.execute(feedWorkRequestResponseHandler);
- ClusterManager.INSTANCE.registerSubscriber(this);
- this.state = AsterixClusterProperties.INSTANCE.getState();
}
@Override
@@ -133,19 +90,6 @@ public class FeedLifecycleListener implements IFeedLifecycleListener {
return feedJobNotificationHandler.getFeedConnectJobInfo(connectionId);
}
- public void registerFeedIntakeProgressTracker(FeedConnectionId connectionId,
- IIntakeProgressTracker feedIntakeProgressTracker) {
- feedJobNotificationHandler.registerFeedIntakeProgressTracker(connectionId, feedIntakeProgressTracker);
- }
-
- public void deregisterFeedIntakeProgressTracker(FeedConnectionId connectionId) {
- feedJobNotificationHandler.deregisterFeedIntakeProgressTracker(connectionId);
- }
-
- public void updateTrackingInformation(StorageReportFeedMessage srm) {
- feedJobNotificationHandler.updateTrackingInformation(srm);
- }
-
/*
* Traverse job specification to categorize job as a feed intake job or a feed collection job
*/
@@ -201,209 +145,6 @@ public class FeedLifecycleListener implements IFeedLifecycleListener {
}
}
- @Override
- public Set<IClusterManagementWork> notifyNodeFailure(Set<String> deadNodeIds) {
- Set<IClusterManagementWork> workToBeDone = new HashSet<IClusterManagementWork>();
-
- Collection<FeedIntakeInfo> intakeInfos = feedJobNotificationHandler.getFeedIntakeInfos();
- Collection<FeedConnectJobInfo> connectJobInfos = feedJobNotificationHandler.getFeedConnectInfos();
-
- Map<String, List<FeedJobInfo>> impactedJobs = new HashMap<String, List<FeedJobInfo>>();
-
- for (String deadNode : deadNodeIds) {
- for (FeedIntakeInfo intakeInfo : intakeInfos) {
- if (intakeInfo.getIntakeLocation().contains(deadNode)) {
- List<FeedJobInfo> infos = impactedJobs.get(deadNode);
- if (infos == null) {
- infos = new ArrayList<FeedJobInfo>();
- impactedJobs.put(deadNode, infos);
- }
- infos.add(intakeInfo);
- intakeInfo.setState(FeedJobState.UNDER_RECOVERY);
- }
- }
-
- for (FeedConnectJobInfo connectInfo : connectJobInfos) {
- if (connectInfo.getStorageLocations().contains(deadNode)) {
- continue;
- }
- if (connectInfo.getComputeLocations().contains(deadNode)
- || connectInfo.getCollectLocations().contains(deadNode)) {
- List<FeedJobInfo> infos = impactedJobs.get(deadNode);
- if (infos == null) {
- infos = new ArrayList<FeedJobInfo>();
- impactedJobs.put(deadNode, infos);
- }
- infos.add(connectInfo);
- connectInfo.setState(FeedJobState.UNDER_RECOVERY);
- feedJobNotificationHandler.deregisterFeedActivity(connectInfo);
- }
- }
-
- }
-
- if (impactedJobs.size() > 0) {
- AddNodeWork addNodeWork = new AddNodeWork(deadNodeIds, deadNodeIds.size(), this);
- feedWorkRequestResponseHandler.registerFeedWork(addNodeWork.getWorkId(), impactedJobs);
- workToBeDone.add(addNodeWork);
- }
- return workToBeDone;
-
- }
-
- public static class FailureReport {
-
- private final List<Pair<FeedConnectJobInfo, List<String>>> recoverableConnectJobs;
- private final Map<IFeedJoint, List<String>> recoverableIntakeFeedIds;
-
- public FailureReport(Map<IFeedJoint, List<String>> recoverableIntakeFeedIds,
- List<Pair<FeedConnectJobInfo, List<String>>> recoverableSubscribers) {
- this.recoverableConnectJobs = recoverableSubscribers;
- this.recoverableIntakeFeedIds = recoverableIntakeFeedIds;
- }
-
- public List<Pair<FeedConnectJobInfo, List<String>>> getRecoverableSubscribers() {
- return recoverableConnectJobs;
- }
-
- public Map<IFeedJoint, List<String>> getRecoverableIntakeFeedIds() {
- return recoverableIntakeFeedIds;
- }
-
- }
-
- @Override
- public Set<IClusterManagementWork> notifyNodeJoin(String joinedNodeId) {
- ClusterState newState = AsterixClusterProperties.INSTANCE.getState();
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info(joinedNodeId + " joined the cluster. " + "Asterix state: " + newState);
- }
-
- boolean needToReActivateFeeds = !newState.equals(state) && (newState == ClusterState.ACTIVE);
- if (needToReActivateFeeds) {
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info(joinedNodeId + " Resuming loser feeds (if any)");
- }
- try {
- FeedsActivator activator = new FeedsActivator();
- (new Thread(activator)).start();
- } catch (Exception e) {
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Exception in resuming feeds" + e.getMessage());
- }
- }
- state = newState;
- } else {
- List<FeedCollectInfo> feedsThatCanBeRevived = new ArrayList<FeedCollectInfo>();
- for (Entry<FeedCollectInfo, List<String>> entry : dependentFeeds.entrySet()) {
- List<String> requiredNodeIds = entry.getValue();
- if (requiredNodeIds.contains(joinedNodeId)) {
- requiredNodeIds.remove(joinedNodeId);
- if (requiredNodeIds.isEmpty()) {
- feedsThatCanBeRevived.add(entry.getKey());
- }
- }
- }
- if (!feedsThatCanBeRevived.isEmpty()) {
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info(joinedNodeId + " Resuming feeds after rejoining of node " + joinedNodeId);
- }
- FeedsActivator activator = new FeedsActivator(feedsThatCanBeRevived);
- (new Thread(activator)).start();
- }
- }
- return null;
- }
-
- @Override
- public void notifyRequestCompletion(IClusterManagementWorkResponse response) {
- try {
- responseInbox.put(response);
- } catch (InterruptedException e) {
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning("Interrupted exception");
- }
- }
- }
-
- @Override
- public void notifyStateChange(ClusterState previousState, ClusterState newState) {
- switch (newState) {
- case ACTIVE:
- if (previousState.equals(ClusterState.UNUSABLE)) {
- try {
- // TODO: Figure out why code was commented
- // FeedsActivator activator = new FeedsActivator();
- // (new Thread(activator)).start();
- } catch (Exception e) {
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Exception in resuming feeds" + e.getMessage());
- }
- }
- }
- break;
- default:
- break;
- }
-
- }
-
- public static class FeedsDeActivator implements Runnable {
-
- private List<FeedConnectJobInfo> failedConnectjobs;
-
- public FeedsDeActivator(List<FeedConnectJobInfo> failedConnectjobs) {
- this.failedConnectjobs = failedConnectjobs;
- }
-
- @Override
- public void run() {
- for (FeedConnectJobInfo failedConnectJob : failedConnectjobs) {
- endFeed(failedConnectJob);
- }
- }
-
- private void endFeed(FeedConnectJobInfo cInfo) {
- MetadataTransactionContext ctx = null;
- PrintWriter writer = new PrintWriter(System.out, true);
- SessionConfig pc = new SessionConfig(writer, OutputFormat.ADM);
-
- try {
- ctx = MetadataManager.INSTANCE.beginTransaction();
- FeedId feedId = cInfo.getConnectionId().getFeedId();
- DisconnectFeedStatement stmt = new DisconnectFeedStatement(new Identifier(feedId.getDataverse()),
- new Identifier(feedId.getFeedName()), new Identifier(cInfo.getConnectionId().getDatasetName()));
- List<Statement> statements = new ArrayList<Statement>();
- DataverseDecl dataverseDecl = new DataverseDecl(new Identifier(feedId.getDataverse()));
- statements.add(dataverseDecl);
- statements.add(stmt);
- QueryTranslator translator = new QueryTranslator(statements, pc, compilationProvider);
- translator.compileAndExecute(AsterixAppContextInfo.getInstance().getHcc(), null,
- QueryTranslator.ResultDelivery.SYNC);
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("End irrecoverable feed: " + cInfo.getConnectionId());
- }
- MetadataManager.INSTANCE.commitTransaction(ctx);
- } catch (Exception e) {
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Exception in ending loser feed: " + cInfo.getConnectionId() + " Exception "
- + e.getMessage());
- }
- e.printStackTrace();
- try {
- MetadataManager.INSTANCE.abortTransaction(ctx);
- } catch (Exception e2) {
- e2.addSuppressed(e);
- if (LOGGER.isLoggable(Level.SEVERE)) {
- LOGGER.severe("Exception in aborting transaction! System is in inconsistent state");
- }
- }
-
- }
-
- }
- }
-
public void submitFeedConnectionRequest(IFeedJoint feedPoint, FeedConnectionRequest subscriptionRequest)
throws Exception {
feedJobNotificationHandler.submitFeedConnectionRequest(feedPoint, subscriptionRequest);
@@ -451,21 +192,6 @@ public class FeedLifecycleListener implements IFeedLifecycleListener {
return feedJobNotificationHandler.isFeedConnectionActive(connectionId, eventSubscriber);
}
- public void reportPartialDisconnection(FeedConnectionId connectionId) {
- }
-
- public void registerFeedReportQueue(FeedConnectionId feedId, LinkedBlockingQueue<String> queue) {
- feedReportQueue.put(feedId, queue);
- }
-
- public void deregisterFeedReportQueue(FeedConnectionId feedId, LinkedBlockingQueue<String> queue) {
- feedReportQueue.remove(feedId);
- }
-
- public LinkedBlockingQueue<String> getFeedReportQueue(FeedConnectionId feedId) {
- return feedReportQueue.get(feedId);
- }
-
@Override
public IFeedJoint getAvailableFeedJoint(FeedJointKey feedJointKey) {
return feedJobNotificationHandler.getAvailableFeedJoint(feedJointKey);
@@ -495,14 +221,6 @@ public class FeedLifecycleListener implements IFeedLifecycleListener {
}
- public JobSpecification getCollectJobSpecification(FeedConnectionId connectionId) {
- return feedJobNotificationHandler.getCollectJobSpecification(connectionId);
- }
-
- public JobId getFeedCollectJobId(FeedConnectionId connectionId) {
- return feedJobNotificationHandler.getFeedCollectJobId(connectionId);
- }
-
public synchronized void notifyPartitionStart(FeedId feedId, JobId jobId) {
jobEventInbox.add(new FeedEvent(jobId, FeedEvent.EventKind.PARTITION_START, feedId));
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedLoadManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedLoadManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedLoadManager.java
deleted file mode 100644
index b6be1e7..0000000
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedLoadManager.java
+++ /dev/null
@@ -1,300 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.app.external;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeSet;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.external.feed.api.IFeedLoadManager;
-import org.apache.asterix.external.feed.api.IFeedTrackingManager;
-import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
-import org.apache.asterix.external.feed.management.FeedConnectionId;
-import org.apache.asterix.external.feed.message.FeedCongestionMessage;
-import org.apache.asterix.external.feed.message.FeedReportMessage;
-import org.apache.asterix.external.feed.message.PrepareStallMessage;
-import org.apache.asterix.external.feed.message.ScaleInReportMessage;
-import org.apache.asterix.external.feed.message.TerminateDataFlowMessage;
-import org.apache.asterix.external.feed.message.ThrottlingEnabledFeedMessage;
-import org.apache.asterix.external.feed.runtime.FeedRuntimeId;
-import org.apache.asterix.external.feed.watch.FeedActivity;
-import org.apache.asterix.external.feed.watch.NodeLoadReport;
-import org.apache.asterix.external.feed.watch.FeedJobInfo.FeedJobState;
-import org.apache.asterix.metadata.feeds.FeedMetadataUtil;
-import org.apache.asterix.om.util.AsterixAppContextInfo;
-import org.apache.hyracks.algebricks.common.utils.Pair;
-import org.apache.hyracks.api.client.IHyracksClientConnection;
-import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.api.job.JobSpecification;
-
-public class FeedLoadManager implements IFeedLoadManager {
-
- private static final Logger LOGGER = Logger.getLogger(FeedLoadManager.class.getName());
-
- private static final long MIN_MODIFICATION_INTERVAL = 180000; // 10 seconds
- private final TreeSet<NodeLoadReport> nodeReports;
- private final Map<FeedConnectionId, FeedActivity> feedActivities;
- private final Map<String, Pair<Integer, Integer>> feedMetrics;
-
- private FeedConnectionId lastModified;
- private long lastModifiedTimestamp;
-
- private static final int UNKNOWN = -1;
-
- public FeedLoadManager() {
- this.nodeReports = new TreeSet<NodeLoadReport>();
- this.feedActivities = new HashMap<FeedConnectionId, FeedActivity>();
- this.feedMetrics = new HashMap<String, Pair<Integer, Integer>>();
- }
-
- @Override
- public void submitNodeLoadReport(NodeLoadReport report) {
- nodeReports.remove(report);
- nodeReports.add(report);
- }
-
- @Override
- public void reportCongestion(FeedCongestionMessage message) throws AsterixException {
- FeedRuntimeId runtimeId = message.getRuntimeId();
- FeedJobState jobState = FeedLifecycleListener.INSTANCE.getFeedJobState(message.getConnectionId());
- if (jobState == null
- || (jobState.equals(FeedJobState.UNDER_RECOVERY))
- || (message.getConnectionId().equals(lastModified) && System.currentTimeMillis()
- - lastModifiedTimestamp < MIN_MODIFICATION_INTERVAL)) {
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Ignoring congestion report from " + runtimeId);
- }
- return;
- } else {
- try {
- FeedLifecycleListener.INSTANCE.setJobState(message.getConnectionId(), FeedJobState.UNDER_RECOVERY);
- int inflowRate = message.getInflowRate();
- int outflowRate = message.getOutflowRate();
- List<String> currentComputeLocations = new ArrayList<String>();
- currentComputeLocations.addAll(FeedLifecycleListener.INSTANCE.getComputeLocations(message
- .getConnectionId().getFeedId()));
- int computeCardinality = currentComputeLocations.size();
- int requiredCardinality = (int) Math
- .ceil((double) ((computeCardinality * inflowRate) / (double) outflowRate)) + 5;
- int additionalComputeNodes = requiredCardinality - computeCardinality;
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning("INCREASING COMPUTE CARDINALITY from " + computeCardinality + " by "
- + additionalComputeNodes);
- }
-
- List<String> helperComputeNodes = getNodeForSubstitution(additionalComputeNodes);
-
- // Step 1) Alter the original feed job to adjust the cardinality
- JobSpecification jobSpec = FeedLifecycleListener.INSTANCE.getCollectJobSpecification(message
- .getConnectionId());
- helperComputeNodes.addAll(currentComputeLocations);
- List<String> newLocations = new ArrayList<String>();
- newLocations.addAll(currentComputeLocations);
- newLocations.addAll(helperComputeNodes);
- FeedMetadataUtil.increaseCardinality(jobSpec, FeedRuntimeType.COMPUTE, requiredCardinality, newLocations);
-
- // Step 2) send prepare to stall message
- gracefullyTerminateDataFlow(message.getConnectionId(), Integer.MAX_VALUE);
-
- // Step 3) run the altered job specification
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("New Job after adjusting to the workload " + jobSpec);
- }
-
- Thread.sleep(10000);
- runJob(jobSpec, false);
- lastModified = message.getConnectionId();
- lastModifiedTimestamp = System.currentTimeMillis();
-
- } catch (Exception e) {
- e.printStackTrace();
- if (LOGGER.isLoggable(Level.SEVERE)) {
- LOGGER.severe("Unable to form the required job for scaling in/out" + e.getMessage());
- }
- throw new AsterixException(e);
- }
- }
- }
-
- @Override
- public void submitScaleInPossibleReport(ScaleInReportMessage message) throws Exception {
- FeedJobState jobState = FeedLifecycleListener.INSTANCE.getFeedJobState(message.getConnectionId());
- if (jobState == null || (jobState.equals(FeedJobState.UNDER_RECOVERY))) {
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning("JobState information for job " + "[" + message.getConnectionId() + "]" + " not found ");
- }
- return;
- } else {
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Processing scale-in message " + message);
- }
- FeedLifecycleListener.INSTANCE.setJobState(message.getConnectionId(), FeedJobState.UNDER_RECOVERY);
- JobSpecification jobSpec = FeedLifecycleListener.INSTANCE.getCollectJobSpecification(message
- .getConnectionId());
- int reducedCardinality = message.getReducedCardinaliy();
- List<String> currentComputeLocations = new ArrayList<String>();
- currentComputeLocations.addAll(FeedLifecycleListener.INSTANCE.getComputeLocations(message.getConnectionId()
- .getFeedId()));
- FeedMetadataUtil.decreaseComputeCardinality(jobSpec, FeedRuntimeType.COMPUTE, reducedCardinality,
- currentComputeLocations);
-
- gracefullyTerminateDataFlow(message.getConnectionId(), reducedCardinality - 1);
- Thread.sleep(3000);
- JobId newJobId = runJob(jobSpec, false);
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Launch modified job" + "[" + newJobId + "]" + "for scale-in \n" + jobSpec);
- }
-
- }
- }
-
- private void gracefullyTerminateDataFlow(FeedConnectionId connectionId, int computePartitionRetainLimit)
- throws Exception {
- // Step 1) send prepare to stall message
- PrepareStallMessage stallMessage = new PrepareStallMessage(connectionId, computePartitionRetainLimit);
- List<String> intakeLocations = FeedLifecycleListener.INSTANCE.getCollectLocations(connectionId);
- List<String> computeLocations = FeedLifecycleListener.INSTANCE.getComputeLocations(connectionId.getFeedId());
- List<String> storageLocations = FeedLifecycleListener.INSTANCE.getStoreLocations(connectionId);
-
- Set<String> operatorLocations = new HashSet<String>();
-
- operatorLocations.addAll(intakeLocations);
- operatorLocations.addAll(computeLocations);
- operatorLocations.addAll(storageLocations);
-
- JobSpecification messageJobSpec = FeedOperations.buildPrepareStallMessageJob(stallMessage, operatorLocations);
- runJob(messageJobSpec, true);
-
- // Step 2)
- TerminateDataFlowMessage terminateMesg = new TerminateDataFlowMessage(connectionId);
- messageJobSpec = FeedOperations.buildTerminateFlowMessageJob(terminateMesg, intakeLocations);
- runJob(messageJobSpec, true);
- }
-
- public static JobId runJob(JobSpecification spec, boolean waitForCompletion) throws Exception {
- IHyracksClientConnection hcc = AsterixAppContextInfo.getInstance().getHcc();
- JobId jobId = hcc.startJob(spec);
- if (waitForCompletion) {
- hcc.waitForCompletion(jobId);
- }
- return jobId;
- }
-
- @Override
- public void submitFeedRuntimeReport(FeedReportMessage report) {
- String key = "" + report.getConnectionId() + ":" + report.getRuntimeId().getFeedRuntimeType();
- Pair<Integer, Integer> value = feedMetrics.get(key);
- if (value == null) {
- value = new Pair<Integer, Integer>(report.getValue(), 1);
- feedMetrics.put(key, value);
- } else {
- value.first = value.first + report.getValue();
- value.second = value.second + 1;
- }
- }
-
- @Override
- public int getOutflowRate(FeedConnectionId connectionId, FeedRuntimeType runtimeType) {
- int rVal;
- String key = "" + connectionId + ":" + runtimeType;
- Pair<Integer, Integer> value = feedMetrics.get(key);
- if (value == null) {
- rVal = UNKNOWN;
- } else {
- rVal = value.first / value.second;
- }
- return rVal;
- }
-
- private List<String> getNodeForSubstitution(int nRequired) {
- List<String> nodeIds = new ArrayList<String>();
- Iterator<NodeLoadReport> it = null;
- int nAdded = 0;
- while (nAdded < nRequired) {
- it = nodeReports.iterator();
- while (it.hasNext()) {
- nodeIds.add(it.next().getNodeId());
- nAdded++;
- }
- }
- return nodeIds;
- }
-
- @Override
- public synchronized List<String> getNodes(int required) {
- Iterator<NodeLoadReport> it;
- List<String> allocated = new ArrayList<String>();
- while (allocated.size() < required) {
- it = nodeReports.iterator();
- while (it.hasNext() && allocated.size() < required) {
- allocated.add(it.next().getNodeId());
- }
- }
- return allocated;
- }
-
- @Override
- public void reportThrottlingEnabled(ThrottlingEnabledFeedMessage mesg) throws AsterixException, Exception {
- System.out.println("Throttling Enabled for " + mesg.getConnectionId() + " " + mesg.getFeedRuntimeId());
- FeedConnectionId connectionId = mesg.getConnectionId();
- List<String> destinationLocations = new ArrayList<String>();
- List<String> storageLocations = FeedLifecycleListener.INSTANCE.getStoreLocations(connectionId);
- List<String> collectLocations = FeedLifecycleListener.INSTANCE.getCollectLocations(connectionId);
-
- destinationLocations.addAll(storageLocations);
- destinationLocations.addAll(collectLocations);
- JobSpecification messageJobSpec = FeedOperations.buildNotifyThrottlingEnabledMessageJob(mesg,
- destinationLocations);
- runJob(messageJobSpec, true);
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.warning("Acking disabled for " + mesg.getConnectionId() + " in view of activated throttling");
- }
- IFeedTrackingManager trackingManager = CentralFeedManager.getInstance().getFeedTrackingManager();
- trackingManager.disableAcking(connectionId);
- }
-
- @Override
- public void reportFeedActivity(FeedConnectionId connectionId, FeedActivity activity) {
- feedActivities.put(connectionId, activity);
- }
-
- @Override
- public FeedActivity getFeedActivity(FeedConnectionId connectionId) {
- return feedActivities.get(connectionId);
- }
-
- @Override
- public Collection<FeedActivity> getFeedActivities() {
- return feedActivities.values();
- }
-
- @Override
- public void removeFeedActivity(FeedConnectionId connectionId) {
- feedActivities.remove(connectionId);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedMessageReceiver.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedMessageReceiver.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedMessageReceiver.java
deleted file mode 100644
index bff1a4d..0000000
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedMessageReceiver.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.app.external;
-
-import java.util.logging.Level;
-
-import org.apache.asterix.app.external.CentralFeedManager.AQLExecutor;
-import org.apache.asterix.external.feed.api.IFeedLoadManager;
-import org.apache.asterix.external.feed.api.IFeedMessage.MessageType;
-import org.apache.asterix.external.feed.api.IFeedTrackingManager;
-import org.apache.asterix.external.feed.message.FeedCongestionMessage;
-import org.apache.asterix.external.feed.message.FeedReportMessage;
-import org.apache.asterix.external.feed.message.FeedTupleCommitAckMessage;
-import org.apache.asterix.external.feed.message.MessageReceiver;
-import org.apache.asterix.external.feed.message.ScaleInReportMessage;
-import org.apache.asterix.external.feed.message.StorageReportFeedMessage;
-import org.apache.asterix.external.feed.message.ThrottlingEnabledFeedMessage;
-import org.apache.asterix.external.feed.watch.NodeLoadReport;
-import org.apache.asterix.external.util.FeedConstants;
-import org.apache.asterix.hyracks.bootstrap.FeedBootstrap;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.json.JSONObject;
-
-public class FeedMessageReceiver extends MessageReceiver<String> {
-
- private static boolean initialized;
-
- private final IFeedLoadManager feedLoadManager;
- private final IFeedTrackingManager feedTrackingManager;
-
- public FeedMessageReceiver(CentralFeedManager centralFeedManager) {
- this.feedLoadManager = centralFeedManager.getFeedLoadManager();
- this.feedTrackingManager = centralFeedManager.getFeedTrackingManager();
- }
-
- @Override
- public void processMessage(String message) throws Exception {
- JSONObject obj = new JSONObject(message);
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Received message " + obj);
- }
- MessageType messageType = MessageType.valueOf(obj.getString(FeedConstants.MessageConstants.MESSAGE_TYPE));
- switch (messageType) {
- case XAQL:
- if (!initialized) {
- FeedBootstrap.setUpInitialArtifacts();
- initialized = true;
- }
- AQLExecutor.executeAQL(obj.getString(FeedConstants.MessageConstants.AQL));
- break;
- case CONGESTION:
- feedLoadManager.reportCongestion(FeedCongestionMessage.read(obj));
- break;
- case FEED_REPORT:
- feedLoadManager.submitFeedRuntimeReport(FeedReportMessage.read(obj));
- break;
- case NODE_REPORT:
- feedLoadManager.submitNodeLoadReport(NodeLoadReport.read(obj));
- break;
- case SCALE_IN_REQUEST:
- feedLoadManager.submitScaleInPossibleReport(ScaleInReportMessage.read(obj));
- break;
- case STORAGE_REPORT:
- FeedLifecycleListener.INSTANCE.updateTrackingInformation(StorageReportFeedMessage.read(obj));
- break;
- case COMMIT_ACK:
- feedTrackingManager.submitAckReport(FeedTupleCommitAckMessage.read(obj));
- break;
- case THROTTLING_ENABLED:
- feedLoadManager.reportThrottlingEnabled(ThrottlingEnabledFeedMessage.read(obj));
- default:
- break;
- }
-
- }
-
- @Override
- public void emptyInbox() throws HyracksDataException {
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedOperations.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedOperations.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedOperations.java
index d8f1893..2de0266 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedOperations.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedOperations.java
@@ -32,10 +32,6 @@ import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
import org.apache.asterix.external.feed.management.FeedConnectionId;
import org.apache.asterix.external.feed.management.FeedId;
import org.apache.asterix.external.feed.message.EndFeedMessage;
-import org.apache.asterix.external.feed.message.FeedTupleCommitResponseMessage;
-import org.apache.asterix.external.feed.message.PrepareStallMessage;
-import org.apache.asterix.external.feed.message.TerminateDataFlowMessage;
-import org.apache.asterix.external.feed.message.ThrottlingEnabledFeedMessage;
import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
import org.apache.asterix.external.feed.watch.FeedConnectJobInfo;
import org.apache.asterix.external.operators.FeedMessageOperatorDescriptor;
@@ -92,29 +88,6 @@ public class FeedOperations {
return new Pair<JobSpecification, IAdapterFactory>(spec, adapterFactory);
}
- public static JobSpecification buildDiscontinueFeedSourceSpec(AqlMetadataProvider metadataProvider, FeedId feedId)
- throws AsterixException, AlgebricksException {
-
- JobSpecification spec = JobSpecificationUtils.createJobSpecification();
- IOperatorDescriptor feedMessenger = null;
- AlgebricksPartitionConstraint messengerPc = null;
-
- List<String> locations = FeedLifecycleListener.INSTANCE.getIntakeLocations(feedId);
- Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> p = buildDiscontinueFeedMessengerRuntime(spec, feedId,
- locations);
-
- feedMessenger = p.first;
- messengerPc = p.second;
-
- AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, feedMessenger, messengerPc);
- NullSinkOperatorDescriptor nullSink = new NullSinkOperatorDescriptor(spec);
- AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, nullSink, messengerPc);
- spec.connect(new OneToOneConnectorDescriptor(spec), feedMessenger, 0, nullSink, 0);
- spec.addRoot(nullSink);
-
- return spec;
- }
-
/**
* Builds the job spec for sending message to an active feed to disconnect it from the
* its source.
@@ -162,66 +135,6 @@ public class FeedOperations {
}
- public static JobSpecification buildPrepareStallMessageJob(PrepareStallMessage stallMessage,
- Collection<String> collectLocations) throws AsterixException {
- JobSpecification messageJobSpec = JobSpecificationUtils.createJobSpecification();
- try {
- Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> p = FeedOperations.buildSendFeedMessageRuntime(
- messageJobSpec, stallMessage.getConnectionId(), stallMessage, collectLocations);
- buildSendFeedMessageJobSpec(p.first, p.second, messageJobSpec);
- } catch (AlgebricksException ae) {
- throw new AsterixException(ae);
- }
- return messageJobSpec;
- }
-
- public static JobSpecification buildNotifyThrottlingEnabledMessageJob(
- ThrottlingEnabledFeedMessage throttlingEnabledMesg, Collection<String> locations) throws AsterixException {
- JobSpecification messageJobSpec = JobSpecificationUtils.createJobSpecification();
- try {
- Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> p = FeedOperations.buildSendFeedMessageRuntime(
- messageJobSpec, throttlingEnabledMesg.getConnectionId(), throttlingEnabledMesg, locations);
- buildSendFeedMessageJobSpec(p.first, p.second, messageJobSpec);
- } catch (AlgebricksException ae) {
- throw new AsterixException(ae);
- }
- return messageJobSpec;
- }
-
- public static JobSpecification buildTerminateFlowMessageJob(TerminateDataFlowMessage terminateMessage,
- List<String> collectLocations) throws AsterixException {
- JobSpecification messageJobSpec = JobSpecificationUtils.createJobSpecification();
- try {
- Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> p = FeedOperations.buildSendFeedMessageRuntime(
- messageJobSpec, terminateMessage.getConnectionId(), terminateMessage, collectLocations);
- buildSendFeedMessageJobSpec(p.first, p.second, messageJobSpec);
- } catch (AlgebricksException ae) {
- throw new AsterixException(ae);
- }
- return messageJobSpec;
- }
-
- public static JobSpecification buildCommitAckResponseJob(FeedTupleCommitResponseMessage commitResponseMessage,
- Collection<String> targetLocations) throws AsterixException {
- JobSpecification messageJobSpec = JobSpecificationUtils.createJobSpecification();
- try {
- Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> p = FeedOperations.buildSendFeedMessageRuntime(
- messageJobSpec, commitResponseMessage.getConnectionId(), commitResponseMessage, targetLocations);
- buildSendFeedMessageJobSpec(p.first, p.second, messageJobSpec);
- } catch (AlgebricksException ae) {
- throw new AsterixException(ae);
- }
- return messageJobSpec;
- }
-
- public static Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildDiscontinueFeedMessengerRuntime(
- JobSpecification jobSpec, FeedId feedId, List<String> locations) throws AlgebricksException {
- FeedConnectionId feedConnectionId = new FeedConnectionId(feedId, null);
- IFeedMessage feedMessage = new EndFeedMessage(feedConnectionId, FeedRuntimeType.INTAKE,
- feedConnectionId.getFeedId(), true, EndFeedMessage.EndMessageType.DISCONTINUE_SOURCE);
- return buildSendFeedMessageRuntime(jobSpec, feedConnectionId, feedMessage, locations);
- }
-
private static Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildSendFeedMessageRuntime(
JobSpecification jobSpec, FeedConnectionId feedConenctionId, IFeedMessage feedMessage,
Collection<String> locations) throws AlgebricksException {
@@ -232,17 +145,6 @@ public class FeedOperations {
return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(feedMessenger, partitionConstraint);
}
- private static JobSpecification buildSendFeedMessageJobSpec(IOperatorDescriptor operatorDescriptor,
- AlgebricksPartitionConstraint messengerPc, JobSpecification messageJobSpec) {
- AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(messageJobSpec, operatorDescriptor,
- messengerPc);
- NullSinkOperatorDescriptor nullSink = new NullSinkOperatorDescriptor(messageJobSpec);
- AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(messageJobSpec, nullSink, messengerPc);
- messageJobSpec.connect(new OneToOneConnectorDescriptor(messageJobSpec), operatorDescriptor, 0, nullSink, 0);
- messageJobSpec.addRoot(nullSink);
- return messageJobSpec;
- }
-
private static Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildDisconnectFeedMessengerRuntime(
JobSpecification jobSpec, FeedConnectionId feedConenctionId, List<String> locations,
FeedRuntimeType sourceFeedRuntimeType, boolean completeDisconnection, FeedId sourceFeedId)
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedTrackingManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedTrackingManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedTrackingManager.java
deleted file mode 100644
index 29230c1..0000000
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedTrackingManager.java
+++ /dev/null
@@ -1,187 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.app.external;
-
-import java.util.Arrays;
-import java.util.BitSet;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.external.feed.api.IFeedTrackingManager;
-import org.apache.asterix.external.feed.management.FeedConnectionId;
-import org.apache.asterix.external.feed.message.FeedTupleCommitAckMessage;
-import org.apache.asterix.external.feed.message.FeedTupleCommitResponseMessage;
-import org.apache.hyracks.api.job.JobSpecification;
-
-public class FeedTrackingManager implements IFeedTrackingManager {
-
- private static final Logger LOGGER = Logger.getLogger(FeedTrackingManager.class.getName());
-
- private final BitSet allOnes;
-
- private Map<FeedConnectionId, Map<AckId, BitSet>> ackHistory;
- private Map<FeedConnectionId, Map<AckId, Integer>> maxBaseAcked;
-
- public FeedTrackingManager() {
- byte[] allOneBytes = new byte[128];
- Arrays.fill(allOneBytes, (byte) 0xff);
- allOnes = BitSet.valueOf(allOneBytes);
- ackHistory = new HashMap<FeedConnectionId, Map<AckId, BitSet>>();
- maxBaseAcked = new HashMap<FeedConnectionId, Map<AckId, Integer>>();
- }
-
- @Override
- public synchronized void submitAckReport(FeedTupleCommitAckMessage ackMessage) {
- AckId ackId = getAckId(ackMessage);
- Map<AckId, BitSet> acksForConnection = ackHistory.get(ackMessage.getConnectionId());
- if (acksForConnection == null) {
- acksForConnection = new HashMap<AckId, BitSet>();
- acksForConnection.put(ackId, BitSet.valueOf(ackMessage.getCommitAcks()));
- ackHistory.put(ackMessage.getConnectionId(), acksForConnection);
- }
- BitSet currentAcks = acksForConnection.get(ackId);
- if (currentAcks == null) {
- currentAcks = BitSet.valueOf(ackMessage.getCommitAcks());
- acksForConnection.put(ackId, currentAcks);
- } else {
- currentAcks.or(BitSet.valueOf(ackMessage.getCommitAcks()));
- }
- if (Arrays.equals(currentAcks.toByteArray(), allOnes.toByteArray())) {
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info(ackMessage.getIntakePartition() + " (" + ackMessage.getBase() + ")" + " is convered");
- }
- Map<AckId, Integer> maxBaseAckedForConnection = maxBaseAcked.get(ackMessage.getConnectionId());
- if (maxBaseAckedForConnection == null) {
- maxBaseAckedForConnection = new HashMap<AckId, Integer>();
- maxBaseAcked.put(ackMessage.getConnectionId(), maxBaseAckedForConnection);
- }
- Integer maxBaseAckedValue = maxBaseAckedForConnection.get(ackId);
- if (maxBaseAckedValue == null) {
- maxBaseAckedValue = ackMessage.getBase();
- maxBaseAckedForConnection.put(ackId, ackMessage.getBase());
- sendCommitResponseMessage(ackMessage.getConnectionId(), ackMessage.getIntakePartition(),
- ackMessage.getBase());
- } else if (ackMessage.getBase() == maxBaseAckedValue + 1) {
- maxBaseAckedForConnection.put(ackId, ackMessage.getBase());
- sendCommitResponseMessage(ackMessage.getConnectionId(), ackMessage.getIntakePartition(),
- ackMessage.getBase());
- } else {
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Ignoring discountiuous acked base " + ackMessage.getBase() + " for " + ackId);
- }
- }
-
- } else {
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("AckId " + ackId + " pending number of acks " + (128 * 8 - currentAcks.cardinality()));
- }
- }
- }
-
- public synchronized void disableTracking(FeedConnectionId connectionId) {
- ackHistory.remove(connectionId);
- maxBaseAcked.remove(connectionId);
- }
-
- private void sendCommitResponseMessage(FeedConnectionId connectionId, int partition, int base) {
- FeedTupleCommitResponseMessage response = new FeedTupleCommitResponseMessage(connectionId, partition, base);
- List<String> storageLocations = FeedLifecycleListener.INSTANCE.getStoreLocations(connectionId);
- List<String> collectLocations = FeedLifecycleListener.INSTANCE.getCollectLocations(connectionId);
- String collectLocation = collectLocations.get(partition);
- Set<String> messageDestinations = new HashSet<String>();
- messageDestinations.add(collectLocation);
- messageDestinations.addAll(storageLocations);
- try {
- JobSpecification spec = FeedOperations.buildCommitAckResponseJob(response, messageDestinations);
- CentralFeedManager.runJob(spec, false);
- } catch (Exception e) {
- e.printStackTrace();
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning("Unable to send commit response message " + response + " exception " + e.getMessage());
- }
- }
- }
-
- private static AckId getAckId(FeedTupleCommitAckMessage ackMessage) {
- return new AckId(ackMessage.getConnectionId(), ackMessage.getIntakePartition(), ackMessage.getBase());
- }
-
- private static class AckId {
- private FeedConnectionId connectionId;
- private int intakePartition;
- private int base;
-
- public AckId(FeedConnectionId connectionId, int intakePartition, int base) {
- this.connectionId = connectionId;
- this.intakePartition = intakePartition;
- this.base = base;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (!(o instanceof AckId)) {
- return false;
- }
- AckId other = (AckId) o;
- return other.getConnectionId().equals(connectionId) && other.getIntakePartition() == intakePartition
- && other.getBase() == base;
- }
-
- @Override
- public String toString() {
- return connectionId + "[" + intakePartition + "]" + "(" + base + ")";
- }
-
- @Override
- public int hashCode() {
- return toString().hashCode();
- }
-
- public FeedConnectionId getConnectionId() {
- return connectionId;
- }
-
- public int getIntakePartition() {
- return intakePartition;
- }
-
- public int getBase() {
- return base;
- }
-
- }
-
- @Override
- public void disableAcking(FeedConnectionId connectionId) {
- ackHistory.remove(connectionId);
- maxBaseAcked.remove(connectionId);
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning("Acking disabled for " + connectionId);
- }
- }
-
-}