You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by am...@apache.org on 2016/01/14 21:32:14 UTC
[20/26] incubator-asterixdb git commit: Feed Fixes and Cleanup
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedRuntimeId.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedRuntimeId.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedRuntimeId.java
deleted file mode 100644
index bf3c2c1..0000000
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedRuntimeId.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.feeds;
-
-import java.io.Serializable;
-
-import org.apache.asterix.common.feeds.api.IFeedRuntime.FeedRuntimeType;
-
-public class FeedRuntimeId implements Serializable {
-
- private static final long serialVersionUID = 1L;
-
- public static final String DEFAULT_OPERAND_ID = "N/A";
-
- private final FeedRuntimeType runtimeType;
- private final int partition;
- private final String operandId;
-
- public FeedRuntimeId(FeedRuntimeType runtimeType, int partition, String operandId) {
- this.runtimeType = runtimeType;
- this.partition = partition;
- this.operandId = operandId;
- }
-
- @Override
- public String toString() {
- return runtimeType + "[" + partition + "]" + "{" + operandId + "}";
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (!(o instanceof FeedRuntimeId)) {
- return false;
- }
- FeedRuntimeId other = (FeedRuntimeId) o;
- return (other.getFeedRuntimeType().equals(runtimeType) && other.getOperandId().equals(operandId) && other
- .getPartition() == partition);
- }
-
- @Override
- public int hashCode() {
- return toString().hashCode();
- }
-
- public FeedRuntimeType getFeedRuntimeType() {
- return runtimeType;
- }
-
- public int getPartition() {
- return partition;
- }
-
- public FeedRuntimeType getRuntimeType() {
- return runtimeType;
- }
-
- public String getOperandId() {
- return operandId;
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedRuntimeInputHandler.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedRuntimeInputHandler.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedRuntimeInputHandler.java
deleted file mode 100644
index 6642df1..0000000
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedRuntimeInputHandler.java
+++ /dev/null
@@ -1,440 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.feeds;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Iterator;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.common.feeds.DataBucket.ContentType;
-import org.apache.asterix.common.feeds.api.IExceptionHandler;
-import org.apache.asterix.common.feeds.api.IFeedManager;
-import org.apache.asterix.common.feeds.api.IFeedMemoryComponent;
-import org.apache.asterix.common.feeds.api.IFeedMessage;
-import org.apache.asterix.common.feeds.api.IFeedRuntime.FeedRuntimeType;
-import org.apache.asterix.common.feeds.api.IFeedRuntime.Mode;
-import org.apache.asterix.common.feeds.message.FeedCongestionMessage;
-import org.apache.asterix.common.feeds.message.ThrottlingEnabledFeedMessage;
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-
-/**
- * Provides for error-handling and input-side buffering for a feed runtime.
- */
-public class FeedRuntimeInputHandler implements IFrameWriter {
-
- private static Logger LOGGER = Logger.getLogger(FeedRuntimeInputHandler.class.getName());
-
- private final FeedConnectionId connectionId;
- private final FeedRuntimeId runtimeId;
- private final FeedPolicyAccessor feedPolicyAccessor;
- private boolean bufferingEnabled;
- private final IExceptionHandler exceptionHandler;
- private final FeedFrameDiscarder discarder;
- private final FeedFrameSpiller spiller;
- private final FeedPolicyAccessor fpa;
- private final IFeedManager feedManager;
-
- private IFrameWriter coreOperator;
- private MonitoredBuffer mBuffer;
- private DataBucketPool pool;
- private FrameCollection frameCollection;
- private Mode mode;
- private Mode lastMode;
- private boolean finished;
- private long nProcessed;
- private boolean throttlingEnabled;
-
- private FrameEventCallback frameEventCallback;
-
- public FeedRuntimeInputHandler(IHyracksTaskContext ctx, FeedConnectionId connectionId, FeedRuntimeId runtimeId,
- IFrameWriter coreOperator, FeedPolicyAccessor fpa, boolean bufferingEnabled, FrameTupleAccessor fta,
- RecordDescriptor recordDesc, IFeedManager feedManager, int nPartitions) throws IOException {
- this.connectionId = connectionId;
- this.runtimeId = runtimeId;
- this.coreOperator = coreOperator;
- this.bufferingEnabled = bufferingEnabled;
- this.feedPolicyAccessor = fpa;
- this.spiller = new FeedFrameSpiller(ctx, connectionId, runtimeId, fpa);
- this.discarder = new FeedFrameDiscarder(connectionId, runtimeId, fpa, this);
- this.exceptionHandler = new FeedExceptionHandler(ctx, fta, recordDesc, feedManager, connectionId);
- this.mode = Mode.PROCESS;
- this.lastMode = Mode.PROCESS;
- this.finished = false;
- this.fpa = fpa;
- this.feedManager = feedManager;
- this.pool = (DataBucketPool) feedManager.getFeedMemoryManager()
- .getMemoryComponent(IFeedMemoryComponent.Type.POOL);
- this.frameCollection = (FrameCollection) feedManager.getFeedMemoryManager()
- .getMemoryComponent(IFeedMemoryComponent.Type.COLLECTION);
- this.frameEventCallback = new FrameEventCallback(fpa, this, coreOperator);
- this.mBuffer = MonitoredBuffer.getMonitoredBuffer(ctx, this, coreOperator, fta, recordDesc,
- feedManager.getFeedMetricCollector(), connectionId, runtimeId, exceptionHandler, frameEventCallback,
- nPartitions, fpa);
- this.mBuffer.start();
- this.throttlingEnabled = false;
- }
-
- @Override
- public synchronized void nextFrame(ByteBuffer frame) throws HyracksDataException {
- try {
- switch (mode) {
- case PROCESS:
- switch (lastMode) {
- case SPILL:
- case POST_SPILL_DISCARD:
- setMode(Mode.PROCESS_SPILL);
- processSpilledBacklog();
- break;
- case STALL:
- setMode(Mode.PROCESS_BACKLOG);
- processBufferredBacklog();
- break;
- default:
- break;
- }
- process(frame);
- break;
- case PROCESS_BACKLOG:
- case PROCESS_SPILL:
- process(frame);
- break;
- case SPILL:
- spill(frame);
- break;
- case DISCARD:
- case POST_SPILL_DISCARD:
- discard(frame);
- break;
- case STALL:
- switch (runtimeId.getFeedRuntimeType()) {
- case COLLECT:
- case COMPUTE_COLLECT:
- case COMPUTE:
- case STORE:
- bufferDataUntilRecovery(frame);
- break;
- default:
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning("Discarding frame during " + mode + " mode " + this.runtimeId);
- }
- break;
- }
- break;
- case END:
- case FAIL:
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning("Ignoring incoming tuples in " + mode + " mode");
- }
- break;
- }
- } catch (Exception e) {
- e.printStackTrace();
- throw new HyracksDataException(e);
- }
- }
-
- private void bufferDataUntilRecovery(ByteBuffer frame) throws Exception {
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Bufferring data until recovery is complete " + this.runtimeId);
- }
- if (frameCollection == null) {
- this.frameCollection = (FrameCollection) feedManager.getFeedMemoryManager()
- .getMemoryComponent(IFeedMemoryComponent.Type.COLLECTION);
- }
- if (frameCollection == null) {
- discarder.processMessage(frame);
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning("Running low on memory! DISCARDING FRAME ");
- }
- } else {
- boolean success = frameCollection.addFrame(frame);
- if (!success) {
- if (fpa.spillToDiskOnCongestion()) {
- if (frame != null) {
- spiller.processMessage(frame);
- } // TODO handle the else case
- } else {
- discarder.processMessage(frame);
- }
- }
- }
- }
-
- public void reportUnresolvableCongestion() throws HyracksDataException {
- if (this.runtimeId.getFeedRuntimeType().equals(FeedRuntimeType.COMPUTE)) {
- FeedCongestionMessage congestionReport = new FeedCongestionMessage(connectionId, runtimeId,
- mBuffer.getInflowRate(), mBuffer.getOutflowRate(), mode);
- feedManager.getFeedMessageService().sendMessage(congestionReport);
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning("Congestion reported " + this.connectionId + " " + this.runtimeId);
- }
- } else {
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning("Unresolvable congestion at " + this.connectionId + " " + this.runtimeId);
- }
- }
- }
-
- private void processBufferredBacklog() throws HyracksDataException {
- try {
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Processing backlog " + this.runtimeId);
- }
-
- if (frameCollection != null) {
- Iterator<ByteBuffer> backlog = frameCollection.getFrameCollectionIterator();
- while (backlog.hasNext()) {
- process(backlog.next());
- nProcessed++;
- }
- DataBucket bucket = pool.getDataBucket();
- bucket.setContentType(ContentType.EOSD);
- bucket.setDesiredReadCount(1);
- mBuffer.sendMessage(bucket);
- feedManager.getFeedMemoryManager().releaseMemoryComponent(frameCollection);
- frameCollection = null;
- }
- } catch (Exception e) {
- e.printStackTrace();
- throw new HyracksDataException(e);
- }
- }
-
- private void processSpilledBacklog() throws HyracksDataException {
- try {
- Iterator<ByteBuffer> backlog = spiller.replayData();
- while (backlog.hasNext()) {
- process(backlog.next());
- nProcessed++;
- }
- DataBucket bucket = pool.getDataBucket();
- bucket.setContentType(ContentType.EOSD);
- bucket.setDesiredReadCount(1);
- mBuffer.sendMessage(bucket);
- spiller.reset();
- } catch (Exception e) {
- e.printStackTrace();
- throw new HyracksDataException(e);
- }
- }
-
- protected void process(ByteBuffer frame) throws HyracksDataException {
- boolean frameProcessed = false;
- while (!frameProcessed) {
- try {
- if (!bufferingEnabled) {
- coreOperator.nextFrame(frame); // synchronous
- mBuffer.sendReport(frame);
- } else {
- DataBucket bucket = pool.getDataBucket();
- if (bucket != null) {
- if (frame != null) {
- bucket.reset(frame); // created a copy here
- bucket.setContentType(ContentType.DATA);
- } else {
- bucket.setContentType(ContentType.EOD);
- }
- bucket.setDesiredReadCount(1);
- mBuffer.sendMessage(bucket);
- mBuffer.sendReport(frame);
- nProcessed++;
- } else {
- if (fpa.spillToDiskOnCongestion()) {
- if (frame != null) {
- boolean spilled = spiller.processMessage(frame);
- if (spilled) {
- setMode(Mode.SPILL);
- } else {
- reportUnresolvableCongestion();
- }
- }
- } else if (fpa.discardOnCongestion()) {
- boolean discarded = discarder.processMessage(frame);
- if (!discarded) {
- reportUnresolvableCongestion();
- }
- } else if (fpa.throttlingEnabled()) {
- setThrottlingEnabled(true);
- } else {
- reportUnresolvableCongestion();
- }
-
- }
- }
- frameProcessed = true;
- } catch (Exception e) {
- if (feedPolicyAccessor.continueOnSoftFailure()) {
- frame = exceptionHandler.handleException(e, frame);
- if (frame == null) {
- frameProcessed = true;
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning("Encountered exception! " + e.getMessage()
- + "Insufficient information, Cannot extract failing tuple");
- }
- }
- } else {
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning("Ingestion policy does not require recovering from tuple. Feed would terminate");
- }
- mBuffer.close(false);
- throw new HyracksDataException(e);
- }
- }
- }
- }
-
- private void spill(ByteBuffer frame) throws Exception {
- boolean success = spiller.processMessage(frame);
- if (!success) {
- // limit reached
- setMode(Mode.POST_SPILL_DISCARD);
- reportUnresolvableCongestion();
- }
- }
-
- private void discard(ByteBuffer frame) throws Exception {
- boolean success = discarder.processMessage(frame);
- if (!success) { // limit reached
- reportUnresolvableCongestion();
- }
- }
-
- public Mode getMode() {
- return mode;
- }
-
- public synchronized void setMode(Mode mode) {
- if (mode.equals(this.mode)) {
- return;
- }
- this.lastMode = this.mode;
- this.mode = mode;
- if (mode.equals(Mode.END)) {
- this.close();
- }
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Switched from " + lastMode + " to " + mode + " " + this.runtimeId);
- }
- }
-
- @Override
- public void close() {
- if (mBuffer != null) {
- boolean disableMonitoring = !this.mode.equals(Mode.STALL);
- if (frameCollection != null) {
- feedManager.getFeedMemoryManager().releaseMemoryComponent(frameCollection);
- }
- if (pool != null) {
- feedManager.getFeedMemoryManager().releaseMemoryComponent(pool);
- }
- mBuffer.close(false, disableMonitoring);
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Closed input side handler for " + this.runtimeId + " disabled monitoring "
- + disableMonitoring + " Mode for runtime " + this.mode);
- }
- }
- }
-
- public IFrameWriter getCoreOperator() {
- return coreOperator;
- }
-
- public void setCoreOperator(IFrameWriter coreOperator) {
- this.coreOperator = coreOperator;
- mBuffer.setFrameWriter(coreOperator);
- frameEventCallback.setCoreOperator(coreOperator);
- }
-
- public boolean isFinished() {
- return finished;
- }
-
- public void setFinished(boolean finished) {
- this.finished = finished;
- }
-
- public long getProcessed() {
- return nProcessed;
- }
-
- public FeedRuntimeId getRuntimeId() {
- return runtimeId;
- }
-
- @Override
- public void open() throws HyracksDataException {
- coreOperator.open();
- }
-
- @Override
- public void fail() throws HyracksDataException {
- coreOperator.fail();
- }
-
- public void reset(int nPartitions) {
- this.mBuffer.setNumberOfPartitions(nPartitions);
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Reset number of partitions to " + nPartitions + " for " + this.runtimeId);
- }
- if (mBuffer != null) {
- mBuffer.reset();
- }
- }
-
- public FeedConnectionId getConnectionId() {
- return connectionId;
- }
-
- public IFeedManager getFeedManager() {
- return feedManager;
- }
-
- public MonitoredBuffer getmBuffer() {
- return mBuffer;
- }
-
- public boolean isThrottlingEnabled() {
- return throttlingEnabled;
- }
-
- public void setThrottlingEnabled(boolean throttlingEnabled) {
- if (this.throttlingEnabled != throttlingEnabled) {
- this.throttlingEnabled = throttlingEnabled;
- IFeedMessage throttlingEnabledMesg = new ThrottlingEnabledFeedMessage(connectionId, runtimeId);
- feedManager.getFeedMessageService().sendMessage(throttlingEnabledMesg);
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning("Throttling " + throttlingEnabled + " for " + this.connectionId + "[" + runtimeId + "]");
- }
- }
- }
-
- public boolean isBufferingEnabled() {
- return bufferingEnabled;
- }
-
- public void setBufferingEnabled(boolean bufferingEnabled) {
- this.bufferingEnabled = bufferingEnabled;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedRuntimeManager.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedRuntimeManager.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedRuntimeManager.java
deleted file mode 100644
index abd5daa..0000000
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedRuntimeManager.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.feeds;
-
-import java.io.IOException;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.common.feeds.api.IFeedConnectionManager;
-
-public class FeedRuntimeManager {
-
- private static Logger LOGGER = Logger.getLogger(FeedRuntimeManager.class.getName());
-
- private final FeedConnectionId connectionId;
- private final IFeedConnectionManager connectionManager;
- private final Map<FeedRuntimeId, FeedRuntime> feedRuntimes;
-
- private final ExecutorService executorService;
-
- public FeedRuntimeManager(FeedConnectionId connectionId, IFeedConnectionManager feedConnectionManager) {
- this.connectionId = connectionId;
- this.feedRuntimes = new ConcurrentHashMap<FeedRuntimeId, FeedRuntime>();
- this.executorService = Executors.newCachedThreadPool();
- this.connectionManager = feedConnectionManager;
- }
-
- public void close() throws IOException {
- if (executorService != null) {
- executorService.shutdownNow();
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Shut down executor service for :" + connectionId);
- }
- }
- }
-
- public FeedRuntime getFeedRuntime(FeedRuntimeId runtimeId) {
- return feedRuntimes.get(runtimeId);
- }
-
- public void registerFeedRuntime(FeedRuntimeId runtimeId, FeedRuntime feedRuntime) {
- feedRuntimes.put(runtimeId, feedRuntime);
- }
-
- public synchronized void deregisterFeedRuntime(FeedRuntimeId runtimeId) {
- feedRuntimes.remove(runtimeId);
- if (feedRuntimes.isEmpty()) {
- connectionManager.deregisterFeed(connectionId);
- }
- }
-
- public ExecutorService getExecutorService() {
- return executorService;
- }
-
- public Set<FeedRuntimeId> getFeedRuntimes() {
- return feedRuntimes.keySet();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedRuntimeReport.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedRuntimeReport.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedRuntimeReport.java
deleted file mode 100644
index d7717ac..0000000
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedRuntimeReport.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.feeds;
-
-public class FeedRuntimeReport {
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedTupleCommitAckMessage.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedTupleCommitAckMessage.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedTupleCommitAckMessage.java
deleted file mode 100644
index ada6566..0000000
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedTupleCommitAckMessage.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.feeds;
-
-import javax.xml.bind.DatatypeConverter;
-
-import org.json.JSONException;
-import org.json.JSONObject;
-
-import org.apache.asterix.common.feeds.message.FeedMessage;
-
-public class FeedTupleCommitAckMessage extends FeedMessage {
-
- private static final long serialVersionUID = 1L;
-
- private final FeedConnectionId connectionId;
- private int intakePartition;
- private int base;
- private byte[] commitAcks;
-
- public FeedTupleCommitAckMessage(FeedConnectionId connectionId, int intakePartition, int base, byte[] commitAcks) {
- super(MessageType.COMMIT_ACK);
- this.connectionId = connectionId;
- this.intakePartition = intakePartition;
- this.base = base;
- this.commitAcks = commitAcks;
- }
-
- @Override
- public JSONObject toJSON() throws JSONException {
- JSONObject obj = new JSONObject();
- obj.put(FeedConstants.MessageConstants.MESSAGE_TYPE, messageType.name());
- obj.put(FeedConstants.MessageConstants.DATAVERSE, connectionId.getFeedId().getDataverse());
- obj.put(FeedConstants.MessageConstants.FEED, connectionId.getFeedId().getFeedName());
- obj.put(FeedConstants.MessageConstants.DATASET, connectionId.getDatasetName());
- obj.put(FeedConstants.MessageConstants.INTAKE_PARTITION, intakePartition);
- obj.put(FeedConstants.MessageConstants.BASE, base);
- String commitAcksString = DatatypeConverter.printBase64Binary(commitAcks);
- obj.put(FeedConstants.MessageConstants.COMMIT_ACKS, commitAcksString);
- return obj;
- }
-
- public static FeedTupleCommitAckMessage read(JSONObject obj) throws JSONException {
- FeedId feedId = new FeedId(obj.getString(FeedConstants.MessageConstants.DATAVERSE),
- obj.getString(FeedConstants.MessageConstants.FEED));
- FeedConnectionId connectionId = new FeedConnectionId(feedId,
- obj.getString(FeedConstants.MessageConstants.DATASET));
- int intakePartition = obj.getInt(FeedConstants.MessageConstants.INTAKE_PARTITION);
- int base = obj.getInt(FeedConstants.MessageConstants.BASE);
- String commitAcksString = obj.getString(FeedConstants.MessageConstants.COMMIT_ACKS);
- byte[] commitAcks = DatatypeConverter.parseBase64Binary(commitAcksString);
- return new FeedTupleCommitAckMessage(connectionId, intakePartition, base, commitAcks);
- }
-
- public FeedConnectionId getConnectionId() {
- return connectionId;
- }
-
- public int getIntakePartition() {
- return intakePartition;
- }
-
- public byte[] getCommitAcks() {
- return commitAcks;
- }
-
- public void reset(int intakePartition, int base, byte[] commitAcks) {
- this.intakePartition = intakePartition;
- this.base = base;
- this.commitAcks = commitAcks;
- }
-
- public int getBase() {
- return base;
- }
-
- public void setBase(int base) {
- this.base = base;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedTupleCommitResponseMessage.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedTupleCommitResponseMessage.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedTupleCommitResponseMessage.java
deleted file mode 100644
index cc32034..0000000
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedTupleCommitResponseMessage.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.feeds;
-
-import org.json.JSONException;
-import org.json.JSONObject;
-
-import org.apache.asterix.common.feeds.message.FeedMessage;
-
-public class FeedTupleCommitResponseMessage extends FeedMessage {
-
- private static final long serialVersionUID = 1L;
-
- private final FeedConnectionId connectionId;
- private final int intakePartition;
- private final int maxWindowAcked;
-
- public FeedTupleCommitResponseMessage(FeedConnectionId connectionId, int intakePartition, int maxWindowAcked) {
- super(MessageType.COMMIT_ACK_RESPONSE);
- this.connectionId = connectionId;
- this.intakePartition = intakePartition;
- this.maxWindowAcked = maxWindowAcked;
- }
-
- @Override
- public JSONObject toJSON() throws JSONException {
- JSONObject obj = new JSONObject();
- obj.put(FeedConstants.MessageConstants.MESSAGE_TYPE, messageType.name());
- obj.put(FeedConstants.MessageConstants.DATAVERSE, connectionId.getFeedId().getDataverse());
- obj.put(FeedConstants.MessageConstants.FEED, connectionId.getFeedId().getFeedName());
- obj.put(FeedConstants.MessageConstants.DATASET, connectionId.getDatasetName());
- obj.put(FeedConstants.MessageConstants.INTAKE_PARTITION, intakePartition);
- obj.put(FeedConstants.MessageConstants.MAX_WINDOW_ACKED, maxWindowAcked);
- return obj;
- }
-
- @Override
- public String toString() {
- return connectionId + "[" + intakePartition + "]" + "(" + maxWindowAcked + ")";
- }
-
- public static FeedTupleCommitResponseMessage read(JSONObject obj) throws JSONException {
- FeedId feedId = new FeedId(obj.getString(FeedConstants.MessageConstants.DATAVERSE),
- obj.getString(FeedConstants.MessageConstants.FEED));
- FeedConnectionId connectionId = new FeedConnectionId(feedId,
- obj.getString(FeedConstants.MessageConstants.DATASET));
- int intakePartition = obj.getInt(FeedConstants.MessageConstants.INTAKE_PARTITION);
- int maxWindowAcked = obj.getInt(FeedConstants.MessageConstants.MAX_WINDOW_ACKED);
- return new FeedTupleCommitResponseMessage(connectionId, intakePartition, maxWindowAcked);
- }
-
- public FeedConnectionId getConnectionId() {
- return connectionId;
- }
-
- public int getMaxWindowAcked() {
- return maxWindowAcked;
- }
-
- public int getIntakePartition() {
- return intakePartition;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-common/src/main/java/org/apache/asterix/common/feeds/FrameCollection.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FrameCollection.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/FrameCollection.java
deleted file mode 100644
index 9ed547e..0000000
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FrameCollection.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.feeds;
-
-import java.nio.ByteBuffer;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-
-import org.apache.asterix.common.feeds.api.IFeedMemoryComponent;
-import org.apache.asterix.common.feeds.api.IFeedMemoryManager;
-
-/**
- * Represents an expandable collection of frames.
- */
-public class FrameCollection implements IFeedMemoryComponent {
-
- /** A unique identifier for the feed memory component **/
- private final int componentId;
-
- /** A collection of frames (each being a ByteBuffer) **/
- private final List<ByteBuffer> frames = new LinkedList<ByteBuffer>();
-
- /** The permitted maximum size, the collection may grow to **/
- private int maxSize;
-
- /** The {@link IFeedMemoryManager} for the NodeController **/
- private final IFeedMemoryManager memoryManager;
-
- public FrameCollection(int componentId, IFeedMemoryManager memoryManager, int maxSize) {
- this.componentId = componentId;
- this.maxSize = maxSize;
- this.memoryManager = memoryManager;
- }
-
- public boolean addFrame(ByteBuffer frame) {
- if (frames.size() == maxSize) {
- boolean expansionGranted = memoryManager.expandMemoryComponent(this);
- if (!expansionGranted) {
- return false;
- }
- }
- ByteBuffer storageBuffer = ByteBuffer.allocate(frame.capacity());
- storageBuffer.put(frame);
- frames.add(storageBuffer);
- storageBuffer.flip();
- return true;
- }
-
- public Iterator<ByteBuffer> getFrameCollectionIterator() {
- return frames.iterator();
- }
-
- @Override
- public int getTotalAllocation() {
- return frames.size();
- }
-
- @Override
- public Type getType() {
- return Type.COLLECTION;
- }
-
- @Override
- public int getComponentId() {
- return componentId;
- }
-
- @Override
- public void expand(int delta) {
- maxSize = maxSize + delta;
- }
-
- @Override
- public void reset() {
- frames.clear();
- maxSize = IFeedMemoryManager.START_COLLECTION_SIZE;
- }
-
- @Override
- public String toString() {
- return "FrameCollection" + "[" + componentId + "]" + "(" + frames.size() + "/" + maxSize + ")";
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-common/src/main/java/org/apache/asterix/common/feeds/FrameDistributor.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FrameDistributor.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/FrameDistributor.java
deleted file mode 100644
index 9e106fb..0000000
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FrameDistributor.java
+++ /dev/null
@@ -1,360 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.feeds;
-
-import java.nio.ByteBuffer;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.common.feeds.api.IFeedMemoryComponent.Type;
-import org.apache.asterix.common.feeds.api.IFeedMemoryManager;
-import org.apache.asterix.common.feeds.api.IFeedRuntime.FeedRuntimeType;
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-
-public class FrameDistributor {
-
- private static final Logger LOGGER = Logger.getLogger(FrameDistributor.class.getName());
-
- private static final long MEMORY_AVAILABLE_POLL_PERIOD = 1000; // 1 second
-
- private final FeedId feedId;
- private final FeedRuntimeType feedRuntimeType;
- private final int partition;
- private final IFeedMemoryManager memoryManager;
- private final boolean enableSynchronousTransfer;
- /** A map storing the registered frame readers ({@code FeedFrameCollector}. **/
- private final Map<IFrameWriter, FeedFrameCollector> registeredCollectors;
- private final FrameTupleAccessor fta;
-
- private DataBucketPool pool;
- private DistributionMode distributionMode;
- private boolean spillToDiskRequired = false;
-
- public enum DistributionMode {
- /**
- * A single feed frame collector is registered for receiving tuples.
- * Tuple is sent via synchronous call, that is no buffering is involved
- **/
- SINGLE,
-
- /**
- * Multiple feed frame collectors are concurrently registered for
- * receiving tuples.
- **/
- SHARED,
-
- /**
- * Feed tuples are not being processed, irrespective of # of registered
- * feed frame collectors.
- **/
- INACTIVE
- }
-
- public FrameDistributor(FeedId feedId, FeedRuntimeType feedRuntimeType, int partition,
- boolean enableSynchronousTransfer, IFeedMemoryManager memoryManager, FrameTupleAccessor fta)
- throws HyracksDataException {
- this.feedId = feedId;
- this.feedRuntimeType = feedRuntimeType;
- this.partition = partition;
- this.memoryManager = memoryManager;
- this.enableSynchronousTransfer = enableSynchronousTransfer;
- this.registeredCollectors = new HashMap<IFrameWriter, FeedFrameCollector>();
- this.distributionMode = DistributionMode.INACTIVE;
- this.fta = fta;
- }
-
- public void notifyEndOfFeed() {
- DataBucket bucket = getDataBucket();
- if (bucket != null) {
- sendEndOfFeedDataBucket(bucket);
- } else {
- while (bucket == null) {
- try {
- Thread.sleep(MEMORY_AVAILABLE_POLL_PERIOD);
- bucket = getDataBucket();
- } catch (InterruptedException e) {
- break;
- }
- }
- if (bucket != null) {
- sendEndOfFeedDataBucket(bucket);
- }
- }
- }
-
- private void sendEndOfFeedDataBucket(DataBucket bucket) {
- bucket.setContentType(DataBucket.ContentType.EOD);
- nextBucket(bucket);
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("End of feed data packet sent " + this.feedId);
- }
- }
-
- public synchronized void registerFrameCollector(FeedFrameCollector frameCollector) {
- DistributionMode currentMode = distributionMode;
- switch (distributionMode) {
- case INACTIVE:
- if (!enableSynchronousTransfer) {
- pool = (DataBucketPool) memoryManager.getMemoryComponent(Type.POOL);
- frameCollector.start();
- }
- registeredCollectors.put(frameCollector.getFrameWriter(), frameCollector);
- setMode(DistributionMode.SINGLE);
- break;
- case SINGLE:
- pool = (DataBucketPool) memoryManager.getMemoryComponent(Type.POOL);
- registeredCollectors.put(frameCollector.getFrameWriter(), frameCollector);
- for (FeedFrameCollector reader : registeredCollectors.values()) {
- reader.start();
- }
- setMode(DistributionMode.SHARED);
- break;
- case SHARED:
- frameCollector.start();
- registeredCollectors.put(frameCollector.getFrameWriter(), frameCollector);
- break;
- }
- evaluateIfSpillIsEnabled();
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info(
- "Switching to " + distributionMode + " mode from " + currentMode + " mode " + " Feed id " + feedId);
- }
- }
-
- public synchronized void deregisterFrameCollector(FeedFrameCollector frameCollector) {
- switch (distributionMode) {
- case INACTIVE:
- throw new IllegalStateException(
- "Invalid attempt to deregister frame collector in " + distributionMode + " mode.");
- case SHARED:
- frameCollector.closeCollector();
- registeredCollectors.remove(frameCollector.getFrameWriter());
- int nCollectors = registeredCollectors.size();
- if (nCollectors == 1) {
- FeedFrameCollector loneCollector = registeredCollectors.values().iterator().next();
- setMode(DistributionMode.SINGLE);
- loneCollector.setState(FeedFrameCollector.State.TRANSITION);
- loneCollector.closeCollector();
- memoryManager.releaseMemoryComponent(pool);
- evaluateIfSpillIsEnabled();
- } else {
- if (!spillToDiskRequired) {
- evaluateIfSpillIsEnabled();
- }
- }
- break;
- case SINGLE:
- frameCollector.closeCollector();
- setMode(DistributionMode.INACTIVE);
- spillToDiskRequired = false;
- break;
-
- }
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Deregistered frame reader" + frameCollector + " from feed distributor for " + feedId);
- }
- }
-
- public void evaluateIfSpillIsEnabled() {
- if (!spillToDiskRequired) {
- for (FeedFrameCollector collector : registeredCollectors.values()) {
- spillToDiskRequired = spillToDiskRequired
- || collector.getFeedPolicyAccessor().spillToDiskOnCongestion();
- if (spillToDiskRequired) {
- break;
- }
- }
- }
- }
-
- public boolean deregisterFrameCollector(IFrameWriter frameWriter) {
- FeedFrameCollector collector = registeredCollectors.get(frameWriter);
- if (collector != null) {
- deregisterFrameCollector(collector);
- return true;
- }
- return false;
- }
-
- public synchronized void setMode(DistributionMode mode) {
- this.distributionMode = mode;
- }
-
- public boolean isRegistered(IFrameWriter writer) {
- return registeredCollectors.get(writer) != null;
- }
-
- public synchronized void nextFrame(ByteBuffer frame) throws HyracksDataException {
- switch (distributionMode) {
- case INACTIVE:
- break;
- case SINGLE:
- FeedFrameCollector collector = registeredCollectors.values().iterator().next();
- switch (collector.getState()) {
- case HANDOVER:
- case ACTIVE:
- if (enableSynchronousTransfer) {
- collector.nextFrame(frame); // processing is synchronous
- } else {
- handleDataBucket(frame);
- }
- break;
- case TRANSITION:
- handleDataBucket(frame);
- break;
- case FINISHED:
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning("Discarding fetched tuples, feed has ended [" + registeredCollectors.get(0)
- + "]" + " Feed Id " + feedId + " frame distributor " + this.getFeedRuntimeType());
- }
- registeredCollectors.remove(0);
- break;
- }
- break;
- case SHARED:
- handleDataBucket(frame);
- break;
- }
- }
-
- private void nextBucket(DataBucket bucket) {
- for (FeedFrameCollector collector : registeredCollectors.values()) {
- collector.sendMessage(bucket); // asynchronous call
- }
- }
-
- private void handleDataBucket(ByteBuffer frame) throws HyracksDataException {
- DataBucket bucket = getDataBucket();
- if (bucket == null) {
- handleFrameDuringMemoryCongestion(frame);
- } else {
- bucket.reset(frame);
- bucket.setDesiredReadCount(registeredCollectors.size());
- nextBucket(bucket);
- }
- }
-
- private void handleFrameDuringMemoryCongestion(ByteBuffer frame) throws HyracksDataException {
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning("Unable to allocate memory, will evaluate the need to spill");
- }
- // wait till memory is available
- }
-
- private DataBucket getDataBucket() {
- DataBucket bucket = null;
- if (pool != null) {
- bucket = pool.getDataBucket();
- if (bucket != null) {
- bucket.setDesiredReadCount(registeredCollectors.size());
- return bucket;
- } else {
- return null;
- }
- }
- return null;
- }
-
- public DistributionMode getMode() {
- return distributionMode;
- }
-
- public void close() {
- switch (distributionMode) {
- case INACTIVE:
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("FrameDistributor is " + distributionMode);
- }
- break;
- case SINGLE:
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Disconnecting single frame reader in " + distributionMode + " mode " + " for feedId "
- + feedId + " " + this.feedRuntimeType);
- }
- setMode(DistributionMode.INACTIVE);
- if (!enableSynchronousTransfer) {
- notifyEndOfFeed(); // send EOD Data Bucket
- waitForCollectorsToFinish();
- }
- registeredCollectors.values().iterator().next().disconnect();
- break;
- case SHARED:
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Signalling End Of Feed; currently operating in " + distributionMode + " mode");
- }
- notifyEndOfFeed(); // send EOD Data Bucket
- waitForCollectorsToFinish();
- break;
- }
- }
-
- private void waitForCollectorsToFinish() {
- synchronized (registeredCollectors.values()) {
- while (!allCollectorsFinished()) {
- try {
- registeredCollectors.values().wait();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- }
-
- private boolean allCollectorsFinished() {
- boolean allFinished = true;
- for (FeedFrameCollector collector : registeredCollectors.values()) {
- allFinished = allFinished && collector.getState().equals(FeedFrameCollector.State.FINISHED);
- }
- return allFinished;
- }
-
- public Collection<FeedFrameCollector> getRegisteredCollectors() {
- return registeredCollectors.values();
- }
-
- public Map<IFrameWriter, FeedFrameCollector> getRegisteredReaders() {
- return registeredCollectors;
- }
-
- public FeedId getFeedId() {
- return feedId;
- }
-
- public DistributionMode getDistributionMode() {
- return distributionMode;
- }
-
- public FeedRuntimeType getFeedRuntimeType() {
- return feedRuntimeType;
- }
-
- public int getPartition() {
- return partition;
- }
-
- public FrameTupleAccessor getFta() {
- return fta;
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-common/src/main/java/org/apache/asterix/common/feeds/FrameEventCallback.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FrameEventCallback.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/FrameEventCallback.java
deleted file mode 100644
index 5551ce6..0000000
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FrameEventCallback.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.feeds;
-
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.common.feeds.api.IFeedRuntime.Mode;
-import org.apache.asterix.common.feeds.api.IFrameEventCallback;
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-public class FrameEventCallback implements IFrameEventCallback {
-
- private static final Logger LOGGER = Logger.getLogger(FrameEventCallback.class.getName());
-
- private final FeedPolicyAccessor fpa;
- private final FeedRuntimeInputHandler inputSideHandler;
- private IFrameWriter coreOperator;
-
- public FrameEventCallback(FeedPolicyAccessor fpa, FeedRuntimeInputHandler inputSideHandler,
- IFrameWriter coreOperator) {
- this.fpa = fpa;
- this.inputSideHandler = inputSideHandler;
- this.coreOperator = coreOperator;
- }
-
- @Override
- public void frameEvent(FrameEvent event) {
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Frame Event for " + inputSideHandler.getRuntimeId() + " " + event);
- }
- if (!event.equals(FrameEvent.FINISHED_PROCESSING_SPILLAGE)
- && inputSideHandler.getMode().equals(Mode.PROCESS_SPILL)) {
- return;
- }
- switch (event) {
- case PENDING_WORK_THRESHOLD_REACHED:
- if (fpa.spillToDiskOnCongestion()) {
- inputSideHandler.setMode(Mode.SPILL);
- } else if (fpa.discardOnCongestion()) {
- inputSideHandler.setMode(Mode.DISCARD);
- } else if (fpa.throttlingEnabled()) {
- inputSideHandler.setThrottlingEnabled(true);
- } else {
- try {
- inputSideHandler.reportUnresolvableCongestion();
- } catch (HyracksDataException e) {
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning("Unable to report congestion!!!");
- }
- }
- }
- break;
- case FINISHED_PROCESSING:
- inputSideHandler.setFinished(true);
- synchronized (coreOperator) {
- coreOperator.notifyAll();
- }
- break;
- case PENDING_WORK_DONE:
- switch (inputSideHandler.getMode()) {
- case SPILL:
- case DISCARD:
- case POST_SPILL_DISCARD:
- inputSideHandler.setMode(Mode.PROCESS);
- break;
- default:
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Received " + event + " ignoring as operating in " + inputSideHandler.getMode());
- }
- }
- break;
- case FINISHED_PROCESSING_SPILLAGE:
- inputSideHandler.setMode(Mode.PROCESS);
- break;
- default:
- break;
- }
- }
-
- public void setCoreOperator(IFrameWriter coreOperator) {
- this.coreOperator = coreOperator;
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-common/src/main/java/org/apache/asterix/common/feeds/IngestionRuntime.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/IngestionRuntime.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/IngestionRuntime.java
deleted file mode 100644
index 926df39..0000000
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/IngestionRuntime.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.feeds;
-
-import java.util.logging.Level;
-
-import org.apache.asterix.common.feeds.api.IAdapterRuntimeManager;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-
-public class IngestionRuntime extends SubscribableRuntime {
-
- private final IAdapterRuntimeManager adapterRuntimeManager;
-
- public IngestionRuntime(FeedId feedId, FeedRuntimeId runtimeId, DistributeFeedFrameWriter feedWriter,
- RecordDescriptor recordDesc, IAdapterRuntimeManager adaptorRuntimeManager) {
- super(feedId, runtimeId, null, feedWriter, recordDesc);
- this.adapterRuntimeManager = adaptorRuntimeManager;
- }
-
- public void subscribeFeed(FeedPolicyAccessor fpa, CollectionRuntime collectionRuntime) throws Exception {
- FeedFrameCollector reader = dWriter.subscribeFeed(fpa, collectionRuntime.getInputHandler(),
- collectionRuntime.getConnectionId());
- collectionRuntime.setFrameCollector(reader);
-
- if (dWriter.getDistributionMode().equals(FrameDistributor.DistributionMode.SINGLE)) {
- adapterRuntimeManager.start();
- }
- subscribers.add(collectionRuntime);
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Subscribed feed collection [" + collectionRuntime + "] to " + this);
- }
- }
-
- public void unsubscribeFeed(CollectionRuntime collectionRuntime) throws Exception {
- dWriter.unsubscribeFeed(collectionRuntime.getInputHandler());
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Unsubscribed feed collection [" + collectionRuntime + "] from " + this);
- }
- if (dWriter.getDistributionMode().equals(FrameDistributor.DistributionMode.INACTIVE)) {
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Stopping adapter for " + this + " as no more registered collectors");
- }
- adapterRuntimeManager.stop();
- }
- subscribers.remove(collectionRuntime);
- }
-
- public void endOfFeed() {
- dWriter.notifyEndOfFeed();
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Notified End Of Feed [" + this + "]");
- }
- }
-
- public IAdapterRuntimeManager getAdapterRuntimeManager() {
- return adapterRuntimeManager;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-common/src/main/java/org/apache/asterix/common/feeds/IntakePartitionStatistics.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/IntakePartitionStatistics.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/IntakePartitionStatistics.java
deleted file mode 100644
index 5601f73..0000000
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/IntakePartitionStatistics.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.feeds;
-
-import java.util.BitSet;
-
-public class IntakePartitionStatistics {
-
- public static int ACK_WINDOW_SIZE = 1024;
- private BitSet bitSet;
-
- public IntakePartitionStatistics(int partition, int base) {
- this.bitSet = new BitSet(ACK_WINDOW_SIZE);
- }
-
- public void ackRecordId(int recordId) {
- int posIndexWithinBase = recordId % ACK_WINDOW_SIZE;
- this.bitSet.set(posIndexWithinBase);
- }
-
- public byte[] getAckInfo() {
- return bitSet.toByteArray();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-common/src/main/java/org/apache/asterix/common/feeds/IntakeSideMonitoredBuffer.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/IntakeSideMonitoredBuffer.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/IntakeSideMonitoredBuffer.java
deleted file mode 100644
index 10b7ddb..0000000
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/IntakeSideMonitoredBuffer.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.feeds;
-
-import org.apache.asterix.common.feeds.api.IExceptionHandler;
-import org.apache.asterix.common.feeds.api.IFeedMetricCollector;
-import org.apache.asterix.common.feeds.api.IFrameEventCallback;
-import org.apache.asterix.common.feeds.api.IFramePostProcessor;
-import org.apache.asterix.common.feeds.api.IFramePreprocessor;
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-
-public class IntakeSideMonitoredBuffer extends MonitoredBuffer {
-
- public IntakeSideMonitoredBuffer(IHyracksTaskContext ctx, FeedRuntimeInputHandler inputHandler, IFrameWriter frameWriter,
- FrameTupleAccessor fta, RecordDescriptor recordDesc, IFeedMetricCollector metricCollector,
- FeedConnectionId connectionId, FeedRuntimeId runtimeId, IExceptionHandler exceptionHandler,
- IFrameEventCallback callback, int nPartitions, FeedPolicyAccessor policyAccessor) {
- super(ctx, inputHandler, frameWriter, fta, recordDesc, metricCollector, connectionId, runtimeId,
- exceptionHandler, callback, nPartitions, policyAccessor);
- }
-
- @Override
- protected boolean monitorProcessingRate() {
- return false;
- }
-
- @Override
- protected boolean logInflowOutflowRate() {
- return false;
- }
-
- @Override
- protected IFramePreprocessor getFramePreProcessor() {
- return null;
- }
-
- @Override
- protected IFramePostProcessor getFramePostProcessor() {
- return null;
- }
-
- @Override
- protected boolean monitorInputQueueLength() {
- return false;
- }
-
- @Override
- protected boolean reportOutflowRate() {
- return false;
- }
-
- @Override
- protected boolean reportInflowRate() {
- return true;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-common/src/main/java/org/apache/asterix/common/feeds/MessageListener.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/MessageListener.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/MessageListener.java
deleted file mode 100644
index c2753d4..0000000
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/MessageListener.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.feeds;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.ServerSocket;
-import java.net.Socket;
-import java.nio.CharBuffer;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-public class MessageListener {
-
- private static final Logger LOGGER = Logger.getLogger(MessageListener.class.getName());
-
- private int port;
- private final LinkedBlockingQueue<String> outbox;
-
- private ExecutorService executorService = Executors.newFixedThreadPool(10);
-
- private MessageListenerServer listenerServer;
-
- public MessageListener(int port, LinkedBlockingQueue<String> outbox) {
- this.port = port;
- this.outbox = outbox;
- }
-
- public void stop() {
- listenerServer.stop();
- if (!executorService.isShutdown()) {
- executorService.shutdownNow();
- }
- }
-
- public void start() throws IOException {
- listenerServer = new MessageListenerServer(port, outbox);
- executorService.execute(listenerServer);
- }
-
- private static class MessageListenerServer implements Runnable {
-
- private final int port;
- private final LinkedBlockingQueue<String> outbox;
- private ServerSocket server;
-
- private static final char EOL = (char) "\n".getBytes()[0];
-
- public MessageListenerServer(int port, LinkedBlockingQueue<String> outbox) {
- this.port = port;
- this.outbox = outbox;
- }
-
- public void stop() {
- try {
- server.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
-
- @Override
- public void run() {
- Socket client = null;
- try {
- server = new ServerSocket(port);
- client = server.accept();
- InputStream in = client.getInputStream();
- CharBuffer buffer = CharBuffer.allocate(5000);
- char ch;
- while (true) {
- ch = (char) in.read();
- if (((int) ch) == -1) {
- break;
- }
- while (ch != EOL) {
- buffer.put(ch);
- ch = (char) in.read();
- }
- buffer.flip();
- String s = new String(buffer.array());
- synchronized (outbox) {
- outbox.add(s + "\n");
- }
- buffer.position(0);
- buffer.limit(5000);
- }
-
- } catch (Exception e) {
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning("Unable to start Message listener" + server);
- }
- } finally {
- if (server != null) {
- try {
- server.close();
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
-
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-common/src/main/java/org/apache/asterix/common/feeds/MessageReceiver.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/MessageReceiver.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/MessageReceiver.java
deleted file mode 100644
index 6490c6a..0000000
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/MessageReceiver.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.feeds;
-
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.common.feeds.api.IMessageReceiver;
-
-public abstract class MessageReceiver<T> implements IMessageReceiver<T> {
-
- protected static final Logger LOGGER = Logger.getLogger(MessageReceiver.class.getName());
-
- protected final LinkedBlockingQueue<T> inbox;
- protected ExecutorService executor;
-
- public MessageReceiver() {
- inbox = new LinkedBlockingQueue<T>();
- }
-
- public abstract void processMessage(T message) throws Exception;
-
- @Override
- public void start() {
- executor = Executors.newSingleThreadExecutor();
- executor.execute(new MessageReceiverRunnable<T>(this));
- }
-
- @Override
- public synchronized void sendMessage(T message) {
- inbox.add(message);
- }
-
- @Override
- public void close(boolean processPending) {
- if (executor != null) {
- executor.shutdown();
- executor = null;
- if (processPending) {
- flushPendingMessages();
- } else {
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Will discard the pending frames " + inbox.size());
- }
- }
- }
- }
-
- private static class MessageReceiverRunnable<T> implements Runnable {
-
- private final LinkedBlockingQueue<T> inbox;
- private final MessageReceiver<T> messageReceiver;
-
- public MessageReceiverRunnable(MessageReceiver<T> messageReceiver) {
- this.inbox = messageReceiver.inbox;
- this.messageReceiver = messageReceiver;
- }
-
- @Override
- public void run() {
- while (true) {
- try {
- T message = inbox.take();
- messageReceiver.processMessage(message);
- } catch (InterruptedException e) {
- e.printStackTrace();
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
- }
-
- protected void flushPendingMessages() {
- while (!inbox.isEmpty()) {
- T message = null;
- try {
- message = inbox.take();
- processMessage(message);
- } catch (InterruptedException ie) {
- // ignore exception but break from the loop
- break;
- } catch (Exception e) {
- e.printStackTrace();
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning("Exception " + e + " in processing message " + message);
- }
- }
- }
- }
-
-}