You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by am...@apache.org on 2016/01/14 21:32:06 UTC
[12/26] incubator-asterixdb git commit: Feed Fixes and Cleanup
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedActivity.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedActivity.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedActivity.java
new file mode 100644
index 0000000..fc06ab9
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedActivity.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.external.feed.watch;
+
+import java.util.Map;
+
+public class FeedActivity implements Comparable<FeedActivity> {
+
+ private int activityId;
+
+ private final String dataverseName;
+ private final String datasetName;
+ private final String feedName;
+ private final Map<String, String> feedActivityDetails;
+
+ public static class FeedActivityDetails {
+ public static final String INTAKE_LOCATIONS = "intake-locations";
+ public static final String COMPUTE_LOCATIONS = "compute-locations";
+ public static final String STORAGE_LOCATIONS = "storage-locations";
+ public static final String COLLECT_LOCATIONS = "collect-locations";
+ public static final String FEED_POLICY_NAME = "feed-policy-name";
+ public static final String FEED_CONNECT_TIMESTAMP = "feed-connect-timestamp";
+ }
+
+ public FeedActivity(String dataverseName, String feedName, String datasetName,
+ Map<String, String> feedActivityDetails) {
+ this.dataverseName = dataverseName;
+ this.feedName = feedName;
+ this.datasetName = datasetName;
+ this.feedActivityDetails = feedActivityDetails;
+ }
+
+ public String getDataverseName() {
+ return dataverseName;
+ }
+
+ public String getDatasetName() {
+ return datasetName;
+ }
+
+ public String getFeedName() {
+ return feedName;
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (this == other) {
+ return true;
+ }
+ if (!(other instanceof FeedActivity)) {
+ return false;
+ }
+
+ if (!((FeedActivity) other).dataverseName.equals(dataverseName)) {
+ return false;
+ }
+ if (!((FeedActivity) other).datasetName.equals(datasetName)) {
+ return false;
+ }
+ if (!((FeedActivity) other).getFeedName().equals(feedName)) {
+ return false;
+ }
+ if (((FeedActivity) other).getActivityId() != (activityId)) {
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ return toString().hashCode();
+ }
+
+ @Override
+ public String toString() {
+ return dataverseName + "." + feedName + " --> " + datasetName + " " + activityId;
+ }
+
+ public String getConnectTimestamp() {
+ return feedActivityDetails.get(FeedActivityDetails.FEED_CONNECT_TIMESTAMP);
+ }
+
+ public int getActivityId() {
+ return activityId;
+ }
+
+ public void setActivityId(int activityId) {
+ this.activityId = activityId;
+ }
+
+ public Map<String, String> getFeedActivityDetails() {
+ return feedActivityDetails;
+ }
+
+ @Override
+ public int compareTo(FeedActivity o) {
+ return o.getActivityId() - this.activityId;
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedConnectJobInfo.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedConnectJobInfo.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedConnectJobInfo.java
new file mode 100644
index 0000000..3e42169
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedConnectJobInfo.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.watch;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.asterix.external.feed.api.IFeedJoint;
+import org.apache.asterix.external.feed.management.FeedConnectionId;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobSpecification;
+
+public class FeedConnectJobInfo extends FeedJobInfo {
+
+ private final FeedConnectionId connectionId;
+ private final Map<String, String> feedPolicy;
+ private final IFeedJoint sourceFeedJoint;
+ private IFeedJoint computeFeedJoint;
+
+ private List<String> collectLocations;
+ private List<String> computeLocations;
+ private List<String> storageLocations;
+
+ public FeedConnectJobInfo(JobId jobId, FeedJobState state, FeedConnectionId connectionId,
+ IFeedJoint sourceFeedJoint, IFeedJoint computeFeedJoint, JobSpecification spec,
+ Map<String, String> feedPolicy) {
+ super(jobId, state, FeedJobInfo.JobType.FEED_CONNECT, spec);
+ this.connectionId = connectionId;
+ this.sourceFeedJoint = sourceFeedJoint;
+ this.computeFeedJoint = computeFeedJoint;
+ this.feedPolicy = feedPolicy;
+ }
+
+ public FeedConnectionId getConnectionId() {
+ return connectionId;
+ }
+
+ public List<String> getCollectLocations() {
+ return collectLocations;
+ }
+
+ public List<String> getComputeLocations() {
+ return computeLocations;
+ }
+
+ public List<String> getStorageLocations() {
+ return storageLocations;
+ }
+
+ public void setCollectLocations(List<String> collectLocations) {
+ this.collectLocations = collectLocations;
+ }
+
+ public void setComputeLocations(List<String> computeLocations) {
+ this.computeLocations = computeLocations;
+ }
+
+ public void setStorageLocations(List<String> storageLocations) {
+ this.storageLocations = storageLocations;
+ }
+
+ public IFeedJoint getSourceFeedJoint() {
+ return sourceFeedJoint;
+ }
+
+ public IFeedJoint getComputeFeedJoint() {
+ return computeFeedJoint;
+ }
+
+ public Map<String, String> getFeedPolicy() {
+ return feedPolicy;
+ }
+
+ public void setComputeFeedJoint(IFeedJoint computeFeedJoint) {
+ this.computeFeedJoint = computeFeedJoint;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedIntakeInfo.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedIntakeInfo.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedIntakeInfo.java
new file mode 100644
index 0000000..3b11811
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedIntakeInfo.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.watch;
+
+import java.util.List;
+
+import org.apache.asterix.external.feed.api.IFeedJoint;
+import org.apache.asterix.external.feed.management.FeedId;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobSpecification;
+
+public class FeedIntakeInfo extends FeedJobInfo {
+
+ private final FeedId feedId;
+ private final IFeedJoint intakeFeedJoint;
+ private final JobSpecification spec;
+ private List<String> intakeLocation;
+
+ public FeedIntakeInfo(JobId jobId, FeedJobState state, JobType jobType, FeedId feedId, IFeedJoint intakeFeedJoint,
+ JobSpecification spec) {
+ super(jobId, state, FeedJobInfo.JobType.INTAKE, spec);
+ this.feedId = feedId;
+ this.intakeFeedJoint = intakeFeedJoint;
+ this.spec = spec;
+ }
+
+ public FeedId getFeedId() {
+ return feedId;
+ }
+
+ public IFeedJoint getIntakeFeedJoint() {
+ return intakeFeedJoint;
+ }
+
+ public JobSpecification getSpec() {
+ return spec;
+ }
+
+ public List<String> getIntakeLocation() {
+ return intakeLocation;
+ }
+
+ public void setIntakeLocation(List<String> intakeLocation) {
+ this.intakeLocation = intakeLocation;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedJobInfo.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedJobInfo.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedJobInfo.java
new file mode 100644
index 0000000..92e00cb
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedJobInfo.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.watch;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobSpecification;
+
+public class FeedJobInfo {
+
+ private static final Logger LOGGER = Logger.getLogger(FeedJobInfo.class.getName());
+
+ public enum JobType {
+ INTAKE,
+ FEED_CONNECT
+ }
+
+ public enum FeedJobState {
+ CREATED,
+ ACTIVE,
+ UNDER_RECOVERY,
+ ENDED
+ }
+
+ protected final JobId jobId;
+ protected final JobType jobType;
+ protected FeedJobState state;
+ protected JobSpecification spec;
+
+ public FeedJobInfo(JobId jobId, FeedJobState state, JobType jobType, JobSpecification spec) {
+ this.jobId = jobId;
+ this.state = state;
+ this.jobType = jobType;
+ this.spec = spec;
+ }
+
+ public JobId getJobId() {
+ return jobId;
+ }
+
+ public FeedJobState getState() {
+ return state;
+ }
+
+ public void setState(FeedJobState state) {
+ this.state = state;
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info(this + " is in " + state + " state.");
+ }
+ }
+
+ public JobType getJobType() {
+ return jobType;
+ }
+
+ public JobSpecification getSpec() {
+ return spec;
+ }
+
+ public void setSpec(JobSpecification spec) {
+ this.spec = spec;
+ }
+
+ public String toString() {
+ return jobId + " [" + jobType + "]";
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedMetricCollector.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedMetricCollector.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedMetricCollector.java
new file mode 100644
index 0000000..f0db639
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedMetricCollector.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.watch;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.external.feed.api.IFeedMetricCollector;
+import org.apache.asterix.external.feed.management.FeedConnectionId;
+import org.apache.asterix.external.feed.runtime.FeedRuntimeId;
+
+public class FeedMetricCollector implements IFeedMetricCollector {
+
+ private static final Logger LOGGER = Logger.getLogger(FeedMetricCollector.class.getName());
+
+ private static final int UNKNOWN = -1;
+
+ private final AtomicInteger globalSenderId = new AtomicInteger(1);
+ private final Map<Integer, Sender> senders = new HashMap<Integer, Sender>();
+ private final Map<Integer, Series> statHistory = new HashMap<Integer, Series>();
+ private final Map<String, Sender> sendersByName = new HashMap<String, Sender>();
+
+ public FeedMetricCollector(String nodeId) {
+ }
+
+ @Override
+ public synchronized int createReportSender(FeedConnectionId connectionId, FeedRuntimeId runtimeId,
+ ValueType valueType, MetricType metricType) {
+ Sender sender = new Sender(globalSenderId.getAndIncrement(), connectionId, runtimeId, valueType, metricType);
+ senders.put(sender.senderId, sender);
+ sendersByName.put(sender.getDisplayName(), sender);
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Sender id " + sender.getSenderId() + " created for " + sender);
+ }
+ return sender.senderId;
+ }
+
+ @Override
+ public void removeReportSender(int senderId) {
+ Sender sender = senders.get(senderId);
+ if (sender != null) {
+ statHistory.remove(senderId);
+ senders.remove(senderId);
+ } else {
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Unable to remove sender Id");
+ }
+ throw new IllegalStateException("Unable to remove sender Id " + senderId + " senders " + senders);
+ }
+ }
+
+ @Override
+ public boolean sendReport(int senderId, int value) {
+ Sender sender = senders.get(senderId);
+ if (sender != null) {
+ Series series = statHistory.get(sender.senderId);
+ if (series == null) {
+ switch (sender.mType) {
+ case AVG:
+ series = new SeriesAvg();
+ break;
+ case RATE:
+ series = new SeriesRate();
+ break;
+ }
+ statHistory.put(sender.senderId, series);
+ }
+ series.addValue(value);
+ return true;
+ }
+ throw new IllegalStateException("Unable to send report sender Id " + senderId + " senders " + senders);
+ }
+
+ @Override
+ public void resetReportSender(int senderId) {
+ Sender sender = senders.get(senderId);
+ if (sender != null) {
+ Series series = statHistory.get(sender.senderId);
+ if (series != null) {
+ series.reset();
+ }
+ } else {
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Sender with id " + senderId + " not found. Unable to reset!");
+ }
+ throw new IllegalStateException("Unable to reset sender Id " + senderId + " senders " + senders);
+ }
+ }
+
+ private static class Sender {
+
+ private final int senderId;
+ private final MetricType mType;
+ private final String displayName;
+
+ public Sender(int senderId, FeedConnectionId connectionId, FeedRuntimeId runtimeId, ValueType valueType,
+ MetricType mType) {
+ this.senderId = senderId;
+ this.mType = mType;
+ this.displayName = createDisplayName(connectionId, runtimeId, valueType);
+ }
+
+ @Override
+ public String toString() {
+ return displayName + "[" + senderId + "]" + "(" + mType + ")";
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof Sender)) {
+ return false;
+ }
+ return ((Sender) o).senderId == senderId;
+ }
+
+ @Override
+ public int hashCode() {
+ return senderId;
+ }
+
+ public static String createDisplayName(FeedConnectionId connectionId, FeedRuntimeId runtimeId,
+ ValueType valueType) {
+ return connectionId + " (" + runtimeId.getFeedRuntimeType() + " )" + "[" + runtimeId.getPartition() + "]"
+ + "{" + valueType + "}";
+ }
+
+ public String getDisplayName() {
+ return displayName;
+ }
+
+ public int getSenderId() {
+ return senderId;
+ }
+ }
+
+ @Override
+ public int getMetric(int senderId) {
+ Sender sender = senders.get(senderId);
+ return getMetric(sender);
+ }
+
+ @Override
+ public int getMetric(FeedConnectionId connectionId, FeedRuntimeId runtimeId, ValueType valueType) {
+ String displayName = Sender.createDisplayName(connectionId, runtimeId, valueType);
+ Sender sender = sendersByName.get(displayName);
+ return getMetric(sender);
+ }
+
+ private int getMetric(Sender sender) {
+ if (sender == null || statHistory.get(sender.getSenderId()) == null) {
+ return UNKNOWN;
+ }
+
+ float result = -1;
+ Series series = statHistory.get(sender.getSenderId());
+ switch (sender.mType) {
+ case AVG:
+ result = ((SeriesAvg) series).getAvg();
+ break;
+ case RATE:
+ result = ((SeriesRate) series).getRate();
+ break;
+ }
+ return (int) result;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/IntakePartitionStatistics.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/IntakePartitionStatistics.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/IntakePartitionStatistics.java
new file mode 100644
index 0000000..acfd1fb
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/IntakePartitionStatistics.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.watch;
+
+import java.util.BitSet;
+
+public class IntakePartitionStatistics {
+
+ public static int ACK_WINDOW_SIZE = 1024;
+ private BitSet bitSet;
+
+ public IntakePartitionStatistics(int partition, int base) {
+ this.bitSet = new BitSet(ACK_WINDOW_SIZE);
+ }
+
+ public void ackRecordId(int recordId) {
+ int posIndexWithinBase = recordId % ACK_WINDOW_SIZE;
+ this.bitSet.set(posIndexWithinBase);
+ }
+
+ public byte[] getAckInfo() {
+ return bitSet.toByteArray();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/IntakeSideMonitoredBuffer.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/IntakeSideMonitoredBuffer.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/IntakeSideMonitoredBuffer.java
new file mode 100644
index 0000000..7a79e23
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/IntakeSideMonitoredBuffer.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.watch;
+
+import org.apache.asterix.external.feed.api.IExceptionHandler;
+import org.apache.asterix.external.feed.api.IFeedMetricCollector;
+import org.apache.asterix.external.feed.api.IFrameEventCallback;
+import org.apache.asterix.external.feed.api.IFramePostProcessor;
+import org.apache.asterix.external.feed.api.IFramePreprocessor;
+import org.apache.asterix.external.feed.dataflow.FeedRuntimeInputHandler;
+import org.apache.asterix.external.feed.management.FeedConnectionId;
+import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
+import org.apache.asterix.external.feed.runtime.FeedRuntimeId;
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+
+public class IntakeSideMonitoredBuffer extends MonitoredBuffer {
+
+ public IntakeSideMonitoredBuffer(IHyracksTaskContext ctx, FeedRuntimeInputHandler inputHandler, IFrameWriter frameWriter,
+ FrameTupleAccessor fta, RecordDescriptor recordDesc, IFeedMetricCollector metricCollector,
+ FeedConnectionId connectionId, FeedRuntimeId runtimeId, IExceptionHandler exceptionHandler,
+ IFrameEventCallback callback, int nPartitions, FeedPolicyAccessor policyAccessor) {
+ super(ctx, inputHandler, frameWriter, fta, recordDesc, metricCollector, connectionId, runtimeId,
+ exceptionHandler, callback, nPartitions, policyAccessor);
+ }
+
+ @Override
+ protected boolean monitorProcessingRate() {
+ return false;
+ }
+
+ @Override
+ protected boolean logInflowOutflowRate() {
+ return false;
+ }
+
+ @Override
+ protected IFramePreprocessor getFramePreProcessor() {
+ return null;
+ }
+
+ @Override
+ protected IFramePostProcessor getFramePostProcessor() {
+ return null;
+ }
+
+ @Override
+ protected boolean monitorInputQueueLength() {
+ return false;
+ }
+
+ @Override
+ protected boolean reportOutflowRate() {
+ return false;
+ }
+
+ @Override
+ protected boolean reportInflowRate() {
+ return true;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/MonitoredBuffer.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/MonitoredBuffer.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/MonitoredBuffer.java
new file mode 100644
index 0000000..db38edf
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/MonitoredBuffer.java
@@ -0,0 +1,396 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.watch;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.logging.Level;
+
+import org.apache.asterix.external.feed.api.IExceptionHandler;
+import org.apache.asterix.external.feed.api.IFeedMetricCollector;
+import org.apache.asterix.external.feed.api.IFrameEventCallback;
+import org.apache.asterix.external.feed.api.IFramePostProcessor;
+import org.apache.asterix.external.feed.api.IFramePreprocessor;
+import org.apache.asterix.external.feed.api.IFeedMetricCollector.MetricType;
+import org.apache.asterix.external.feed.api.IFeedMetricCollector.ValueType;
+import org.apache.asterix.external.feed.api.IFeedRuntime.Mode;
+import org.apache.asterix.external.feed.api.IFrameEventCallback.FrameEvent;
+import org.apache.asterix.external.feed.dataflow.DataBucket;
+import org.apache.asterix.external.feed.dataflow.FeedRuntimeInputHandler;
+import org.apache.asterix.external.feed.dataflow.StorageFrameHandler;
+import org.apache.asterix.external.feed.management.FeedConnectionId;
+import org.apache.asterix.external.feed.message.MessageReceiver;
+import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
+import org.apache.asterix.external.feed.runtime.FeedRuntimeId;
+import org.apache.asterix.external.feed.watch.MonitoredBufferTimerTasks.LogInputOutputRateTask;
+import org.apache.asterix.external.feed.watch.MonitoredBufferTimerTasks.MonitorInputQueueLengthTimerTask;
+import org.apache.asterix.external.feed.watch.MonitoredBufferTimerTasks.MonitoreProcessRateTimerTask;
+import org.apache.asterix.external.feed.watch.MonitoredBufferTimerTasks.MonitoredBufferStorageTimerTask;
+import org.apache.asterix.external.util.FeedFrameUtil;
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+
+public abstract class MonitoredBuffer extends MessageReceiver<DataBucket> {
+
+ protected static final long LOG_INPUT_OUTPUT_RATE_FREQUENCY = 5000; // 5 seconds
+ protected static final long INPUT_QUEUE_MEASURE_FREQUENCY = 1000; // 1 second
+ protected static final long PROCESSING_RATE_MEASURE_FREQUENCY = 10000; // 10 seconds
+
+ protected static final int PROCESS_RATE_REFRESH = 2; // refresh processing rate every 10th frame
+
+ protected final IHyracksTaskContext ctx;
+ protected final FeedConnectionId connectionId;
+ protected final FeedRuntimeId runtimeId;
+ protected final FrameTupleAccessor inflowFta;
+ protected final FrameTupleAccessor outflowFta;
+ protected final FeedRuntimeInputHandler inputHandler;
+ protected final IFrameEventCallback callback;
+ protected final Timer timer;
+ private final IExceptionHandler exceptionHandler;
+ protected final FeedPolicyAccessor policyAccessor;
+ protected int nPartitions;
+
+ private IFrameWriter frameWriter;
+ protected IFeedMetricCollector metricCollector;
+ protected boolean monitorProcessingRate = false;
+ protected boolean monitorInputQueueLength = false;
+ protected boolean logInflowOutflowRate = false;
+ protected boolean reportOutflowRate = false;
+ protected boolean reportInflowRate = false;
+
+ protected int inflowReportSenderId = -1;
+ protected int outflowReportSenderId = -1;
+ protected TimerTask monitorInputQueueLengthTask;
+ protected TimerTask processingRateTask;
+ protected TimerTask logInflowOutflowRateTask;
+ protected MonitoredBufferStorageTimerTask storageTimeTrackingRateTask;
+ protected StorageFrameHandler storageFromeHandler;
+
+ protected int processingRate = -1;
+ protected int frameCount = 0;
+ private long avgDelayPersistence = 0;
+ private boolean active;
+ private Map<Integer, Long> tupleTimeStats;
+ IFramePostProcessor postProcessor = null;
+ IFramePreprocessor preProcessor = null;
+
+ public static MonitoredBuffer getMonitoredBuffer(IHyracksTaskContext ctx, FeedRuntimeInputHandler inputHandler,
+ IFrameWriter frameWriter, FrameTupleAccessor fta, RecordDescriptor recordDesc,
+ IFeedMetricCollector metricCollector, FeedConnectionId connectionId, FeedRuntimeId runtimeId,
+ IExceptionHandler exceptionHandler, IFrameEventCallback callback, int nPartitions,
+ FeedPolicyAccessor policyAccessor) {
+ switch (runtimeId.getFeedRuntimeType()) {
+ case COMPUTE:
+ return new ComputeSideMonitoredBuffer(ctx, inputHandler, frameWriter, fta, recordDesc, metricCollector,
+ connectionId, runtimeId, exceptionHandler, callback, nPartitions, policyAccessor);
+ case STORE:
+ return new StorageSideMonitoredBuffer(ctx, inputHandler, frameWriter, fta, recordDesc, metricCollector,
+ connectionId, runtimeId, exceptionHandler, callback, nPartitions, policyAccessor);
+ case COLLECT:
+ return new IntakeSideMonitoredBuffer(ctx, inputHandler, frameWriter, fta, recordDesc, metricCollector,
+ connectionId, runtimeId, exceptionHandler, callback, nPartitions, policyAccessor);
+ default:
+ return new BasicMonitoredBuffer(ctx, inputHandler, frameWriter, fta, recordDesc, metricCollector,
+ connectionId, runtimeId, exceptionHandler, callback, nPartitions, policyAccessor);
+ }
+ }
+
+ protected MonitoredBuffer(IHyracksTaskContext ctx, FeedRuntimeInputHandler inputHandler, IFrameWriter frameWriter,
+ FrameTupleAccessor fta, RecordDescriptor recordDesc, IFeedMetricCollector metricCollector,
+ FeedConnectionId connectionId, FeedRuntimeId runtimeId, IExceptionHandler exceptionHandler,
+ IFrameEventCallback callback, int nPartitions, FeedPolicyAccessor policyAccessor) {
+ this.ctx = ctx;
+ this.connectionId = connectionId;
+ this.frameWriter = frameWriter;
+ this.inflowFta = new FrameTupleAccessor(recordDesc);
+ this.outflowFta = new FrameTupleAccessor(recordDesc);
+ this.runtimeId = runtimeId;
+ this.metricCollector = metricCollector;
+ this.exceptionHandler = exceptionHandler;
+ this.callback = callback;
+ this.inputHandler = inputHandler;
+ this.timer = new Timer();
+ this.policyAccessor = policyAccessor;
+ this.nPartitions = nPartitions;
+ this.active = true;
+ initializeMonitoring();
+ }
+
+ protected abstract boolean monitorProcessingRate();
+
+ protected abstract boolean logInflowOutflowRate();
+
+ protected abstract boolean reportOutflowRate();
+
+ protected abstract boolean reportInflowRate();
+
+ protected abstract boolean monitorInputQueueLength();
+
+ protected abstract IFramePreprocessor getFramePreProcessor();
+
+ protected abstract IFramePostProcessor getFramePostProcessor();
+
+ protected void initializeMonitoring() {
+ monitorProcessingRate = monitorProcessingRate();
+ monitorInputQueueLength = monitorInputQueueLength();
+ reportInflowRate = reportInflowRate();
+ reportOutflowRate = reportOutflowRate();
+ logInflowOutflowRate = policyAccessor.isLoggingStatisticsEnabled() || logInflowOutflowRate();
+
+ if (monitorProcessingRate && policyAccessor.isElastic()) { // check possibility to scale in
+ this.processingRateTask = new MonitoreProcessRateTimerTask(this, inputHandler.getFeedManager(),
+ connectionId, nPartitions);
+ this.timer.scheduleAtFixedRate(processingRateTask, 0, PROCESSING_RATE_MEASURE_FREQUENCY);
+ }
+
+ if (monitorInputQueueLength && (policyAccessor.isElastic() || policyAccessor.throttlingEnabled()
+ || policyAccessor.spillToDiskOnCongestion() || policyAccessor.discardOnCongestion())) {
+ this.monitorInputQueueLengthTask = new MonitorInputQueueLengthTimerTask(this, callback);
+ this.timer.scheduleAtFixedRate(monitorInputQueueLengthTask, 0, INPUT_QUEUE_MEASURE_FREQUENCY);
+ }
+
+ if (logInflowOutflowRate || reportInflowRate || reportOutflowRate) {
+ this.logInflowOutflowRateTask = new LogInputOutputRateTask(this, logInflowOutflowRate, reportInflowRate,
+ reportOutflowRate);
+ this.timer.scheduleAtFixedRate(logInflowOutflowRateTask, 0, LOG_INPUT_OUTPUT_RATE_FREQUENCY);
+ this.inflowReportSenderId = metricCollector.createReportSender(connectionId, runtimeId,
+ ValueType.INFLOW_RATE, MetricType.RATE);
+ this.outflowReportSenderId = metricCollector.createReportSender(connectionId, runtimeId,
+ ValueType.OUTFLOW_RATE, MetricType.RATE);
+ }
+ }
+
+ protected void deinitializeMonitoring() {
+ if (monitorInputQueueLengthTask != null) {
+ monitorInputQueueLengthTask.cancel();
+ }
+ if (processingRateTask != null) {
+ processingRateTask.cancel();
+ }
+ if (logInflowOutflowRate || reportInflowRate || reportOutflowRate) {
+ metricCollector.removeReportSender(inflowReportSenderId);
+ metricCollector.removeReportSender(outflowReportSenderId);
+ logInflowOutflowRateTask.cancel();
+ }
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Disabled monitoring for " + this.runtimeId);
+ }
+ }
+
+ protected void postProcessFrame(long startTime, ByteBuffer frame) throws Exception {
+ if (monitorProcessingRate) {
+ frameCount++;
+ if (frameCount % PROCESS_RATE_REFRESH == 0) {
+ long endTime = System.currentTimeMillis();
+ processingRate = (int) ((double) outflowFta.getTupleCount() * 1000 / (endTime - startTime));
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Processing Rate :" + processingRate + " tuples/sec");
+ }
+ frameCount = 0;
+ }
+ }
+
+ if (logInflowOutflowRate || reportOutflowRate) {
+ metricCollector.sendReport(outflowReportSenderId, outflowFta.getTupleCount());
+ }
+
+ postProcessFrame(frame);
+
+ }
+
+ protected void preProcessFrame(ByteBuffer frame) throws Exception {
+ if (preProcessor == null) {
+ preProcessor = getFramePreProcessor();
+ }
+ if (preProcessor != null) {
+ preProcessor.preProcess(frame);
+ }
+ }
+
+ protected void postProcessFrame(ByteBuffer frame) throws Exception {
+ if (postProcessor == null) {
+ postProcessor = getFramePostProcessor();
+ }
+ if (postProcessor != null) {
+ outflowFta.reset(frame);
+ postProcessor.postProcessFrame(frame, outflowFta);
+ }
+ }
+
+ @Override
+ public void sendMessage(DataBucket message) {
+ inbox.add(message);
+ }
+
+ public void sendReport(ByteBuffer frame) {
+ if ((logInflowOutflowRate || reportInflowRate) && !(inputHandler.getMode().equals(Mode.PROCESS_BACKLOG)
+ || inputHandler.getMode().equals(Mode.PROCESS_SPILL))) {
+ inflowFta.reset(frame);
+ metricCollector.sendReport(inflowReportSenderId, inflowFta.getTupleCount());
+ }
+ }
+
+ /** return rate in terms of tuples/sec **/
+ public int getInflowRate() {
+ return metricCollector.getMetric(inflowReportSenderId);
+ }
+
+ /** return rate in terms of tuples/sec **/
+ public int getOutflowRate() {
+ return metricCollector.getMetric(outflowReportSenderId);
+ }
+
+ /** return the number of pending frames from the input queue **/
+ public int getWorkSize() {
+ return inbox.size();
+ }
+
+ /** reset the number of partitions (cardinality) for the runtime **/
+ public void setNumberOfPartitions(int nPartitions) {
+ if (processingRateTask != null) {
+ int currentPartitions = ((MonitoreProcessRateTimerTask) processingRateTask).getNumberOfPartitions();
+ if (currentPartitions != nPartitions) {
+ ((MonitoreProcessRateTimerTask) processingRateTask).setNumberOfPartitions(nPartitions);
+ }
+ }
+ }
+
+ public FeedRuntimeInputHandler getInputHandler() {
+ return inputHandler;
+ }
+
+ public synchronized void close(boolean processPending, boolean disableMonitoring) {
+ super.close(processPending);
+ if (disableMonitoring) {
+ deinitializeMonitoring();
+ }
+ active = false;
+ }
+
+ @Override
+ public synchronized void processMessage(DataBucket message) throws Exception {
+ if (!active) {
+ message.doneReading();
+ return;
+ }
+ switch (message.getContentType()) {
+ case DATA:
+ boolean finishedProcessing = false;
+ ByteBuffer frameReceived = message.getContent();
+ ByteBuffer frameToProcess = null;
+ if (inputHandler.isThrottlingEnabled()) {
+ inflowFta.reset(frameReceived);
+ int pRate = getProcessingRate();
+ int inflowRate = getInflowRate();
+ if (inflowRate > pRate) {
+ double retainFraction = (pRate * 0.8 / inflowRate);
+ frameToProcess = throttleFrame(inflowFta, retainFraction);
+ inflowFta.reset(frameToProcess);
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Throttling at fraction " + retainFraction + "inflow rate " + inflowRate
+ + " no of tuples remaining " + inflowFta.getTupleCount());
+
+ }
+ } else {
+ frameToProcess = frameReceived;
+ }
+ } else {
+ frameToProcess = frameReceived;
+ }
+ outflowFta.reset(frameToProcess);
+ long startTime = 0;
+ while (!finishedProcessing) {
+ try {
+ inflowFta.reset(frameToProcess);
+ startTime = System.currentTimeMillis();
+ preProcessFrame(frameToProcess);
+ frameWriter.nextFrame(frameToProcess);
+ postProcessFrame(startTime, frameToProcess);
+ finishedProcessing = true;
+ } catch (Exception e) {
+ e.printStackTrace();
+ frameToProcess = exceptionHandler.handleException(e, frameToProcess);
+ finishedProcessing = true;
+ }
+ }
+ message.doneReading();
+ break;
+ case EOD:
+ message.doneReading();
+ timer.cancel();
+ callback.frameEvent(FrameEvent.FINISHED_PROCESSING);
+ break;
+ case EOSD:
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Done processing spillage");
+ }
+ message.doneReading();
+ callback.frameEvent(FrameEvent.FINISHED_PROCESSING_SPILLAGE);
+ break;
+
+ }
+ }
+
+ private ByteBuffer throttleFrame(FrameTupleAccessor fta, double retainFraction) throws HyracksDataException {
+ int desiredTuples = (int) (fta.getTupleCount() * retainFraction);
+ return FeedFrameUtil.getSampledFrame(ctx, fta, desiredTuples);
+ }
+
+ public Mode getMode() {
+ return inputHandler.getMode();
+ }
+
+ public FeedRuntimeId getRuntimeId() {
+ return runtimeId;
+ }
+
+ public void setFrameWriter(IFrameWriter frameWriter) {
+ this.frameWriter = frameWriter;
+ }
+
+ public void reset() {
+ active = true;
+ if (logInflowOutflowRate) {
+ metricCollector.resetReportSender(inflowReportSenderId);
+ metricCollector.resetReportSender(outflowReportSenderId);
+ }
+ }
+
+ public int getProcessingRate() {
+ return processingRate;
+ }
+
+ public Map<Integer, Long> getTupleTimeStats() {
+ return tupleTimeStats;
+ }
+
+ public long getAvgDelayRecordPersistence() {
+ return avgDelayPersistence;
+ }
+
+ public MonitoredBufferStorageTimerTask getStorageTimeTrackingRateTask() {
+ return storageTimeTrackingRateTask;
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/MonitoredBufferTimerTasks.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/MonitoredBufferTimerTasks.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/MonitoredBufferTimerTasks.java
new file mode 100644
index 0000000..86c6bca
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/MonitoredBufferTimerTasks.java
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.watch;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TimerTask;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.common.config.AsterixFeedProperties;
+import org.apache.asterix.external.feed.api.IFeedManager;
+import org.apache.asterix.external.feed.api.IFeedMessageService;
+import org.apache.asterix.external.feed.api.IFrameEventCallback;
+import org.apache.asterix.external.feed.api.IFeedMetricCollector.ValueType;
+import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
+import org.apache.asterix.external.feed.api.IFeedRuntime.Mode;
+import org.apache.asterix.external.feed.api.IFrameEventCallback.FrameEvent;
+import org.apache.asterix.external.feed.dataflow.StorageFrameHandler;
+import org.apache.asterix.external.feed.management.FeedConnectionId;
+import org.apache.asterix.external.feed.message.FeedReportMessage;
+import org.apache.asterix.external.feed.message.FeedTupleCommitAckMessage;
+import org.apache.asterix.external.feed.message.FeedTupleCommitResponseMessage;
+import org.apache.asterix.external.feed.message.ScaleInReportMessage;
+import org.apache.asterix.external.feed.message.StorageReportFeedMessage;
+import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
+
+public class MonitoredBufferTimerTasks {
+
+ private static final Logger LOGGER = Logger.getLogger(MonitorInputQueueLengthTimerTask.class.getName());
+
+ public static class MonitoredBufferStorageTimerTask extends TimerTask {
+
+ private static final int PERSISTENCE_DELAY_VIOLATION_MAX = 5;
+
+ private final StorageSideMonitoredBuffer mBuffer;
+ private final IFeedManager feedManager;
+ private final int partition;
+ private final FeedConnectionId connectionId;
+ private final FeedPolicyAccessor policyAccessor;
+ private final StorageFrameHandler storageFromeHandler;
+ private final StorageReportFeedMessage storageReportMessage;
+ private final FeedTupleCommitAckMessage tupleCommitAckMessage;
+
+ private Map<Integer, Integer> maxIntakeBaseCovered;
+ private int countDelayExceeded = 0;
+
+ public MonitoredBufferStorageTimerTask(StorageSideMonitoredBuffer mBuffer, IFeedManager feedManager,
+ FeedConnectionId connectionId, int partition, FeedPolicyAccessor policyAccessor,
+ StorageFrameHandler storageFromeHandler) {
+ this.mBuffer = mBuffer;
+ this.feedManager = feedManager;
+ this.connectionId = connectionId;
+ this.partition = partition;
+ this.policyAccessor = policyAccessor;
+ this.storageFromeHandler = storageFromeHandler;
+ this.storageReportMessage = new StorageReportFeedMessage(this.connectionId, this.partition, 0, false, 0, 0);
+ this.tupleCommitAckMessage = new FeedTupleCommitAckMessage(this.connectionId, 0, 0, null);
+ this.maxIntakeBaseCovered = new HashMap<Integer, Integer>();
+ }
+
+ @Override
+ public void run() {
+ if (mBuffer.isAckingEnabled() && !mBuffer.getInputHandler().isThrottlingEnabled()) {
+ ackRecords();
+ }
+ if (mBuffer.isTimeTrackingEnabled()) {
+ checkLatencyViolation();
+ }
+ }
+
+ private void ackRecords() {
+ Set<Integer> partitions = storageFromeHandler.getPartitionsWithStats();
+ List<Integer> basesCovered = new ArrayList<Integer>();
+ for (int intakePartition : partitions) {
+ Map<Integer, IntakePartitionStatistics> baseAcks = storageFromeHandler
+ .getBaseAcksForPartition(intakePartition);
+ for (Entry<Integer, IntakePartitionStatistics> entry : baseAcks.entrySet()) {
+ int base = entry.getKey();
+ IntakePartitionStatistics stats = entry.getValue();
+ Integer maxIntakeBaseForPartition = maxIntakeBaseCovered.get(intakePartition);
+ if (maxIntakeBaseForPartition == null || maxIntakeBaseForPartition < base) {
+ tupleCommitAckMessage.reset(intakePartition, base, stats.getAckInfo());
+ feedManager.getFeedMessageService().sendMessage(tupleCommitAckMessage);
+ } else {
+ basesCovered.add(base);
+ }
+ }
+ for (Integer b : basesCovered) {
+ baseAcks.remove(b);
+ }
+ basesCovered.clear();
+ }
+ }
+
+ private void checkLatencyViolation() {
+ long avgDelayPersistence = storageFromeHandler.getAvgDelayPersistence();
+ if (avgDelayPersistence > policyAccessor.getMaxDelayRecordPersistence()) {
+ countDelayExceeded++;
+ if (countDelayExceeded > PERSISTENCE_DELAY_VIOLATION_MAX) {
+ storageReportMessage.reset(0, false, mBuffer.getAvgDelayRecordPersistence());
+ feedManager.getFeedMessageService().sendMessage(storageReportMessage);
+ }
+ } else {
+ countDelayExceeded = 0;
+ }
+ }
+
+ public void receiveCommitAckResponse(FeedTupleCommitResponseMessage message) {
+ maxIntakeBaseCovered.put(message.getIntakePartition(), message.getMaxWindowAcked());
+ }
+ }
+
+ public static class LogInputOutputRateTask extends TimerTask {
+
+ private final MonitoredBuffer mBuffer;
+ private final boolean log;
+ private final boolean reportInflow;
+ private final boolean reportOutflow;
+
+ private final IFeedMessageService messageService;
+ private final FeedReportMessage message;
+
+ public LogInputOutputRateTask(MonitoredBuffer mBuffer, boolean log, boolean reportInflow, boolean reportOutflow) {
+ this.mBuffer = mBuffer;
+ this.log = log;
+ this.reportInflow = reportInflow;
+ this.reportOutflow = reportOutflow;
+ if (reportInflow || reportOutflow) {
+ ValueType vType = reportInflow ? ValueType.INFLOW_RATE : ValueType.OUTFLOW_RATE;
+ messageService = mBuffer.getInputHandler().getFeedManager().getFeedMessageService();
+ message = new FeedReportMessage(mBuffer.getInputHandler().getConnectionId(), mBuffer.getRuntimeId(),
+ vType, 0);
+ } else {
+ messageService = null;
+ message = null;
+ }
+
+ }
+
+ @Override
+ public void run() {
+ int pendingWork = mBuffer.getWorkSize();
+ int outflowRate = mBuffer.getOutflowRate();
+ int inflowRate = mBuffer.getInflowRate();
+ if (log) {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info(mBuffer.getRuntimeId() + " " + "Inflow rate:" + inflowRate + " Outflow Rate:"
+ + outflowRate + " Pending Work " + pendingWork);
+ }
+ }
+ if (reportInflow) {
+ message.reset(inflowRate);
+ } else if (reportOutflow) {
+ message.reset(outflowRate);
+ }
+ messageService.sendMessage(message);
+ }
+ }
+
+ public static class MonitorInputQueueLengthTimerTask extends TimerTask {
+
+ private final MonitoredBuffer mBuffer;
+ private final IFrameEventCallback callback;
+ private final int pendingWorkThreshold;
+ private final int maxSuccessiveThresholdPeriods;
+ private FrameEvent lastEvent = FrameEvent.NO_OP;
+ private int pendingWorkExceedCount = 0;
+
+ public MonitorInputQueueLengthTimerTask(MonitoredBuffer mBuffer, IFrameEventCallback callback) {
+ this.mBuffer = mBuffer;
+ this.callback = callback;
+ AsterixFeedProperties props = mBuffer.getInputHandler().getFeedManager().getAsterixFeedProperties();
+ pendingWorkThreshold = props.getPendingWorkThreshold();
+ maxSuccessiveThresholdPeriods = props.getMaxSuccessiveThresholdPeriod();
+ }
+
+ @Override
+ public void run() {
+ int pendingWork = mBuffer.getWorkSize();
+ if (mBuffer.getMode().equals(Mode.PROCESS_SPILL) || mBuffer.getMode().equals(Mode.PROCESS_BACKLOG)) {
+ return;
+ }
+
+ switch (lastEvent) {
+ case NO_OP:
+ case PENDING_WORK_DONE:
+ case FINISHED_PROCESSING_SPILLAGE:
+ if (pendingWork > pendingWorkThreshold) {
+ pendingWorkExceedCount++;
+ if (pendingWorkExceedCount > maxSuccessiveThresholdPeriods) {
+ pendingWorkExceedCount = 0;
+ lastEvent = FrameEvent.PENDING_WORK_THRESHOLD_REACHED;
+ callback.frameEvent(lastEvent);
+ }
+ } else if (pendingWork == 0 && mBuffer.getMode().equals(Mode.SPILL)) {
+ lastEvent = FrameEvent.PENDING_WORK_DONE;
+ callback.frameEvent(lastEvent);
+ }
+ break;
+ case PENDING_WORK_THRESHOLD_REACHED:
+ if (((pendingWork * 1.0) / pendingWorkThreshold) <= 0.5) {
+ lastEvent = FrameEvent.PENDING_WORK_DONE;
+ callback.frameEvent(lastEvent);
+ }
+ break;
+ case FINISHED_PROCESSING:
+ break;
+
+ }
+ }
+ }
+
+ /**
+ * A timer task to measure and compare the processing rate and inflow rate
+ * to look for possibility to scale-in, that is reduce the degree of cardinality
+ * of the compute operator.
+ */
+ public static class MonitoreProcessRateTimerTask extends TimerTask {
+
+ private final MonitoredBuffer mBuffer;
+ private final IFeedManager feedManager;
+ private int nPartitions;
+ private ScaleInReportMessage sMessage;
+ private boolean proposedChange;
+
+ public MonitoreProcessRateTimerTask(MonitoredBuffer mBuffer, IFeedManager feedManager,
+ FeedConnectionId connectionId, int nPartitions) {
+ this.mBuffer = mBuffer;
+ this.feedManager = feedManager;
+ this.nPartitions = nPartitions;
+ this.sMessage = new ScaleInReportMessage(connectionId, FeedRuntimeType.COMPUTE, 0, 0);
+ this.proposedChange = false;
+ }
+
+ public int getNumberOfPartitions() {
+ return nPartitions;
+ }
+
+ public void setNumberOfPartitions(int nPartitions) {
+ this.nPartitions = nPartitions;
+ proposedChange = false;
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Reset the number of partitions for " + mBuffer.getRuntimeId() + " to " + nPartitions);
+ }
+ }
+
+ @Override
+ public void run() {
+ if (!proposedChange) {
+ int inflowRate = mBuffer.getInflowRate();
+ int procRate = mBuffer.getProcessingRate();
+ if (inflowRate > 0 && procRate > 0) {
+ if (inflowRate < procRate) {
+ int possibleCardinality = (int) Math.ceil(nPartitions * inflowRate / (double) procRate);
+ if (possibleCardinality < nPartitions
+ && ((((nPartitions - possibleCardinality) * 1.0) / nPartitions) >= 0.25)) {
+ sMessage.reset(nPartitions, possibleCardinality);
+ feedManager.getFeedMessageService().sendMessage(sMessage);
+ proposedChange = true;
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Proposed scale-in " + sMessage);
+ }
+ }
+ } else {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Inflow Rate (" + inflowRate + ") exceeds Processing Rate" + " (" + procRate
+ + ")");
+ }
+ }
+ }
+ } else {
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Waiting for earlier proposal to scale in to be applied");
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/NodeLoad.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/NodeLoad.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/NodeLoad.java
new file mode 100644
index 0000000..d3919b5
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/NodeLoad.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.watch;
+
+import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
+
+public class NodeLoad implements Comparable<NodeLoad> {
+
+ private final String nodeId;
+
+ private int nRuntimes;
+
+ public NodeLoad(String nodeId) {
+ this.nodeId = nodeId;
+ this.nRuntimes = 0;
+ }
+
+ public void addLoad() {
+ nRuntimes++;
+ }
+
+ public void removeLoad(FeedRuntimeType runtimeType) {
+ nRuntimes--;
+ }
+
+ @Override
+ public int compareTo(NodeLoad o) {
+ if (this == o) {
+ return 0;
+ }
+ return nRuntimes - o.getnRuntimes();
+ }
+
+ public String getNodeId() {
+ return nodeId;
+ }
+
+ public int getnRuntimes() {
+ return nRuntimes;
+ }
+
+ public void setnRuntimes(int nRuntimes) {
+ this.nRuntimes = nRuntimes;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/NodeLoadReport.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/NodeLoadReport.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/NodeLoadReport.java
new file mode 100644
index 0000000..bfddcf6
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/NodeLoadReport.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.watch;
+
+import org.apache.asterix.external.util.FeedConstants;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+public class NodeLoadReport implements Comparable<NodeLoadReport> {
+
+ private final String nodeId;
+ private float cpuLoad;
+ private double usedHeap;
+ private int nRuntimes;
+
+ public NodeLoadReport(String nodeId, float cpuLoad, float usedHeap, int nRuntimes) {
+ this.nodeId = nodeId;
+ this.cpuLoad = cpuLoad;
+ this.usedHeap = usedHeap;
+ this.nRuntimes = nRuntimes;
+ }
+
+ public static NodeLoadReport read(JSONObject obj) throws JSONException {
+ NodeLoadReport r = new NodeLoadReport(obj.getString(FeedConstants.MessageConstants.NODE_ID),
+ (float) obj.getDouble(FeedConstants.MessageConstants.CPU_LOAD),
+ (float) obj.getDouble(FeedConstants.MessageConstants.HEAP_USAGE),
+ obj.getInt(FeedConstants.MessageConstants.N_RUNTIMES));
+ return r;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof NodeLoadReport)) {
+ return false;
+ }
+ return ((NodeLoadReport) o).nodeId.equals(nodeId);
+ }
+
+ @Override
+ public int hashCode() {
+ return nodeId.hashCode();
+ }
+
+ @Override
+ public int compareTo(NodeLoadReport o) {
+ if (nRuntimes != o.getnRuntimes()) {
+ return nRuntimes - o.getnRuntimes();
+ } else {
+ return (int) (this.cpuLoad - ((NodeLoadReport) o).cpuLoad);
+ }
+ }
+
+ public float getCpuLoad() {
+ return cpuLoad;
+ }
+
+ public void setCpuLoad(float cpuLoad) {
+ this.cpuLoad = cpuLoad;
+ }
+
+ public double getUsedHeap() {
+ return usedHeap;
+ }
+
+ public void setUsedHeap(double usedHeap) {
+ this.usedHeap = usedHeap;
+ }
+
+ public int getnRuntimes() {
+ return nRuntimes;
+ }
+
+ public void setnRuntimes(int nRuntimes) {
+ this.nRuntimes = nRuntimes;
+ }
+
+ public String getNodeId() {
+ return nodeId;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/NodeLoadReportService.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/NodeLoadReportService.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/NodeLoadReportService.java
new file mode 100644
index 0000000..f651935
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/NodeLoadReportService.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.watch;
+
+import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryMXBean;
+import java.lang.management.OperatingSystemMXBean;
+import java.util.List;
+import java.util.Timer;
+import java.util.TimerTask;
+
+import org.apache.asterix.external.feed.api.IFeedManager;
+import org.apache.asterix.external.feed.api.IFeedMessageService;
+import org.apache.asterix.external.feed.api.IFeedService;
+import org.apache.asterix.external.feed.message.NodeReportMessage;
+import org.apache.asterix.external.feed.runtime.FeedRuntimeId;
+
+public class NodeLoadReportService implements IFeedService {
+
+ private static final int NODE_LOAD_REPORT_FREQUENCY = 2000;
+ private static final float CPU_CHANGE_THRESHOLD = 0.2f;
+ private static final float HEAP_CHANGE_THRESHOLD = 0.4f;
+
+ private final NodeLoadReportTask task;
+ private final Timer timer;
+
+ public NodeLoadReportService(String nodeId, IFeedManager feedManager) {
+ this.task = new NodeLoadReportTask(nodeId, feedManager);
+ this.timer = new Timer();
+ }
+
+ @Override
+ public void start() throws Exception {
+ timer.schedule(task, 0, NODE_LOAD_REPORT_FREQUENCY);
+ }
+
+ @Override
+ public void stop() {
+ timer.cancel();
+ }
+
+ private static class NodeLoadReportTask extends TimerTask {
+
+ private final IFeedManager feedManager;
+ private final NodeReportMessage message;
+ private final IFeedMessageService messageService;
+
+ private static OperatingSystemMXBean osBean = ManagementFactory.getOperatingSystemMXBean();
+ private static MemoryMXBean memBean = ManagementFactory.getMemoryMXBean();
+
+ public NodeLoadReportTask(String nodeId, IFeedManager feedManager) {
+ this.feedManager = feedManager;
+ this.message = new NodeReportMessage(0.0f, 0L, 0);
+ this.messageService = feedManager.getFeedMessageService();
+ }
+
+ @Override
+ public void run() {
+ List<FeedRuntimeId> runtimeIds = feedManager.getFeedConnectionManager().getRegisteredRuntimes();
+ int nRuntimes = runtimeIds.size();
+ double cpuLoad = getCpuLoad();
+ double usedHeap = getUsedHeap();
+ if (sendMessage(nRuntimes, cpuLoad, usedHeap)) {
+ message.reset(cpuLoad, usedHeap, nRuntimes);
+ messageService.sendMessage(message);
+ }
+ }
+
+ private boolean sendMessage(int nRuntimes, double cpuLoad, double usedHeap) {
+ if (message == null) {
+ return true;
+ }
+
+ boolean changeInCpu = (Math.abs(cpuLoad - message.getCpuLoad())
+ / message.getCpuLoad()) > CPU_CHANGE_THRESHOLD;
+ boolean changeInUsedHeap = (Math.abs(usedHeap - message.getUsedHeap())
+ / message.getUsedHeap()) > HEAP_CHANGE_THRESHOLD;
+ boolean changeInRuntimeSize = nRuntimes != message.getnRuntimes();
+ return changeInCpu || changeInUsedHeap || changeInRuntimeSize;
+ }
+
+ private double getCpuLoad() {
+ return osBean.getSystemLoadAverage();
+ }
+
+ private double getUsedHeap() {
+ return ((double) memBean.getHeapMemoryUsage().getUsed()) / memBean.getHeapMemoryUsage().getMax();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/Series.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/Series.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/Series.java
new file mode 100644
index 0000000..ec95371
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/Series.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.watch;
+
+import org.apache.asterix.external.feed.api.IFeedMetricCollector.MetricType;
+
+public abstract class Series {
+
+ protected final MetricType type;
+ protected int runningSum;
+
+ public Series(MetricType type) {
+ this.type = type;
+ }
+
+ public abstract void addValue(int value);
+
+ public int getRunningSum() {
+ return runningSum;
+ }
+
+ public MetricType getType() {
+ return type;
+ }
+
+ public abstract void reset();
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/SeriesAvg.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/SeriesAvg.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/SeriesAvg.java
new file mode 100644
index 0000000..f75379d
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/SeriesAvg.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.watch;
+
+import org.apache.asterix.external.feed.api.IFeedMetricCollector.MetricType;
+
+public class SeriesAvg extends Series {
+
+ private int count;
+
+ public SeriesAvg() {
+ super(MetricType.AVG);
+ }
+
+ public int getAvg() {
+ return runningSum / count;
+ }
+
+ public synchronized void addValue(int value) {
+ if (value < 0) {
+ return;
+ }
+ runningSum += value;
+ count++;
+ }
+
+ public void reset(){
+ count = 0;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/SeriesRate.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/SeriesRate.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/SeriesRate.java
new file mode 100644
index 0000000..91eea87
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/SeriesRate.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.watch;
+
+import java.util.Timer;
+import java.util.TimerTask;
+
+import org.apache.asterix.external.feed.api.IFeedMetricCollector.MetricType;
+
+public class SeriesRate extends Series {
+
+ private static final long REFRESH_MEASUREMENT = 5000; // 5 seconds
+
+ private int rate;
+ private Timer timer;
+ private RateComputingTask task;
+
+ public SeriesRate() {
+ super(MetricType.RATE);
+ begin();
+ }
+
+ public int getRate() {
+ return rate;
+ }
+
+ public synchronized void addValue(int value) {
+ if (value < 0) {
+ return;
+ }
+ runningSum += value;
+ }
+
+ public void begin() {
+ if (timer == null) {
+ timer = new Timer();
+ task = new RateComputingTask(this);
+ timer.scheduleAtFixedRate(task, 0, REFRESH_MEASUREMENT);
+ }
+ }
+
+ public void end() {
+ if (timer != null) {
+ timer.cancel();
+ }
+ }
+
+ public void reset() {
+ rate = 0;
+ if (task != null) {
+ task.reset();
+ }
+ }
+
+ private class RateComputingTask extends TimerTask {
+
+ private int lastMeasured = 0;
+ private final SeriesRate series;
+
+ public RateComputingTask(SeriesRate series) {
+ this.series = series;
+ }
+
+ @Override
+ public void run() {
+ int currentValue = series.getRunningSum();
+ rate = (int) (((currentValue - lastMeasured) * 1000) / REFRESH_MEASUREMENT);
+ lastMeasured = currentValue;
+ }
+
+ public void reset() {
+ lastMeasured = 0;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/StorageSideMonitoredBuffer.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/StorageSideMonitoredBuffer.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/StorageSideMonitoredBuffer.java
new file mode 100644
index 0000000..1f9551d
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/StorageSideMonitoredBuffer.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.watch;
+
+import java.nio.ByteBuffer;
+
+import org.apache.asterix.external.feed.api.IExceptionHandler;
+import org.apache.asterix.external.feed.api.IFeedMetricCollector;
+import org.apache.asterix.external.feed.api.IFrameEventCallback;
+import org.apache.asterix.external.feed.api.IFramePostProcessor;
+import org.apache.asterix.external.feed.api.IFramePreprocessor;
+import org.apache.asterix.external.feed.dataflow.FeedRuntimeInputHandler;
+import org.apache.asterix.external.feed.dataflow.StorageFrameHandler;
+import org.apache.asterix.external.feed.management.FeedConnectionId;
+import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
+import org.apache.asterix.external.feed.runtime.FeedRuntimeId;
+import org.apache.asterix.external.util.FeedConstants.StatisticsConstants;
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+
+public class StorageSideMonitoredBuffer extends MonitoredBuffer {
+
+ private static final long STORAGE_TIME_TRACKING_FREQUENCY = 5000; // 10
+ // seconds
+
+ private boolean ackingEnabled;
+ private final boolean timeTrackingEnabled;
+
+ public StorageSideMonitoredBuffer(IHyracksTaskContext ctx, FeedRuntimeInputHandler inputHandler,
+ IFrameWriter frameWriter, FrameTupleAccessor fta, RecordDescriptor recordDesc,
+ IFeedMetricCollector metricCollector, FeedConnectionId connectionId, FeedRuntimeId runtimeId,
+ IExceptionHandler exceptionHandler, IFrameEventCallback callback, int nPartitions,
+ FeedPolicyAccessor policyAccessor) {
+ super(ctx, inputHandler, frameWriter, fta, recordDesc, metricCollector, connectionId, runtimeId,
+ exceptionHandler, callback, nPartitions, policyAccessor);
+ timeTrackingEnabled = policyAccessor.isTimeTrackingEnabled();
+ ackingEnabled = policyAccessor.atleastOnceSemantics();
+ if (ackingEnabled || timeTrackingEnabled) {
+ storageFromeHandler = new StorageFrameHandler();
+ this.storageTimeTrackingRateTask = new MonitoredBufferTimerTasks.MonitoredBufferStorageTimerTask(this,
+ inputHandler.getFeedManager(), connectionId, runtimeId.getPartition(), policyAccessor,
+ storageFromeHandler);
+ this.timer.scheduleAtFixedRate(storageTimeTrackingRateTask, 0, STORAGE_TIME_TRACKING_FREQUENCY);
+ }
+ }
+
+ @Override
+ protected boolean monitorProcessingRate() {
+ return false;
+ }
+
+ @Override
+ protected boolean logInflowOutflowRate() {
+ return true;
+ }
+
+ @Override
+ public IFramePreprocessor getFramePreProcessor() {
+ return new IFramePreprocessor() {
+
+ @Override
+ public void preProcess(ByteBuffer frame) {
+ try {
+ if (ackingEnabled) {
+ storageFromeHandler.updateTrackingInformation(frame, inflowFta);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ };
+ }
+
+ @Override
+ protected IFramePostProcessor getFramePostProcessor() {
+ return new IFramePostProcessor() {
+
+ private static final long NORMAL_WINDOW_LIMIT = 400 * 1000;
+ private static final long HIGH_WINDOW_LIMIT = 800 * 1000;
+
+ private long delayNormalWindow = 0;
+ private long delayHighWindow = 0;
+ private long delayLowWindow = 0;
+
+ private int countNormalWindow;
+ private int countHighWindow;
+ private int countLowWindow;
+
+ private long beginIntakeTimestamp = 0;
+
+ @Override
+ public void postProcessFrame(ByteBuffer frame, FrameTupleAccessor frameAccessor) {
+ if (ackingEnabled || timeTrackingEnabled) {
+ int nTuples = frameAccessor.getTupleCount();
+ long intakeTimestamp;
+ long currentTime = System.currentTimeMillis();
+ for (int i = 0; i < nTuples; i++) {
+ int recordStart = frameAccessor.getTupleStartOffset(i) + frameAccessor.getFieldSlotsLength();
+ int openPartOffsetOrig = frame.getInt(recordStart + 6);
+ int numOpenFields = frame.getInt(recordStart + openPartOffsetOrig);
+
+ int recordIdOffset = openPartOffsetOrig + 4 + 8 * numOpenFields
+ + (StatisticsConstants.INTAKE_TUPLEID.length() + 2) + 1;
+
+ int partitionOffset = recordIdOffset + 4 + (StatisticsConstants.INTAKE_PARTITION.length() + 2)
+ + 1;
+
+ int intakeTimestampValueOffset = partitionOffset + 4
+ + (StatisticsConstants.INTAKE_TIMESTAMP.length() + 2) + 1;
+ intakeTimestamp = frame.getLong(recordStart + intakeTimestampValueOffset);
+ if (beginIntakeTimestamp == 0) {
+ beginIntakeTimestamp = intakeTimestamp;
+ LOGGER.warning("Begin Timestamp: " + beginIntakeTimestamp);
+ }
+
+ updateRunningAvg(intakeTimestamp, currentTime);
+
+ int storeTimestampValueOffset = intakeTimestampValueOffset + 8
+ + (StatisticsConstants.STORE_TIMESTAMP.length() + 2) + 1;
+ frame.putLong(recordStart + storeTimestampValueOffset, System.currentTimeMillis());
+ }
+ logRunningAvg();
+ resetRunningAvg();
+ }
+ }
+
+ private void updateRunningAvg(long intakeTimestamp, long currentTime) {
+ long diffTimestamp = intakeTimestamp - beginIntakeTimestamp;
+ long delay = (currentTime - intakeTimestamp);
+ if (diffTimestamp < NORMAL_WINDOW_LIMIT) {
+ delayNormalWindow += delay;
+ countNormalWindow++;
+ } else if (diffTimestamp < HIGH_WINDOW_LIMIT) {
+ delayHighWindow += delay;
+ countHighWindow++;
+ } else {
+ delayLowWindow += delay;
+ countLowWindow++;
+ }
+ }
+
+ private void resetRunningAvg() {
+ delayNormalWindow = 0;
+ countNormalWindow = 0;
+ delayHighWindow = 0;
+ countHighWindow = 0;
+ delayLowWindow = 0;
+ countLowWindow = 0;
+ }
+
+ private void logRunningAvg() {
+ if (countNormalWindow != 0 && delayNormalWindow != 0) {
+ LOGGER.warning("Window:" + 0 + ":" + "Avg Travel_Time:" + (delayNormalWindow / countNormalWindow));
+ }
+ if (countHighWindow != 0 && delayHighWindow != 0) {
+ LOGGER.warning("Window:" + 1 + ":" + "Avg Travel_Time:" + (delayHighWindow / countHighWindow));
+ }
+ if (countLowWindow != 0 && delayLowWindow != 0) {
+ LOGGER.warning("Window:" + 2 + ":" + "Avg Travel_Time:" + (delayLowWindow / countLowWindow));
+ }
+ }
+
+ };
+ }
+
+ public boolean isAckingEnabled() {
+ return ackingEnabled;
+ }
+
+ public void setAcking(boolean ackingEnabled) {
+ this.ackingEnabled = ackingEnabled;
+ }
+
+ public boolean isTimeTrackingEnabled() {
+ return timeTrackingEnabled;
+ }
+
+ @Override
+ protected boolean monitorInputQueueLength() {
+ return true;
+ }
+
+ @Override
+ protected boolean reportOutflowRate() {
+ return true;
+ }
+
+ @Override
+ protected boolean reportInflowRate() {
+ return false;
+ }
+
+}