You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by am...@apache.org on 2016/01/14 21:32:15 UTC
[21/26] incubator-asterixdb git commit: Feed Fixes and Cleanup
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedExceptionHandler.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedExceptionHandler.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedExceptionHandler.java
deleted file mode 100644
index f1728ce..0000000
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedExceptionHandler.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.feeds;
-
-import java.io.DataInputStream;
-import java.nio.ByteBuffer;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.common.exceptions.FrameDataException;
-import org.apache.asterix.common.feeds.api.IExceptionHandler;
-import org.apache.asterix.common.feeds.api.IFeedManager;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import org.apache.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
-
-public class FeedExceptionHandler implements IExceptionHandler {
-
- private static final Logger LOGGER = Logger.getLogger(FeedExceptionHandler.class.getName());
-
- private final IHyracksTaskContext ctx;
- private final FrameTupleAccessor fta;
- private final RecordDescriptor recordDesc;
- private final IFeedManager feedManager;
- private final FeedConnectionId connectionId;
-
- public FeedExceptionHandler(IHyracksTaskContext ctx, FrameTupleAccessor fta, RecordDescriptor recordDesc,
- IFeedManager feedManager, FeedConnectionId connectionId) {
- this.ctx = ctx;
- this.fta = fta;
- this.recordDesc = recordDesc;
- this.feedManager = feedManager;
- this.connectionId = connectionId;
- }
-
- public ByteBuffer handleException(Exception e, ByteBuffer frame) {
- try {
- if (e instanceof FrameDataException) {
- fta.reset(frame);
- FrameDataException fde = (FrameDataException) e;
- int tupleIndex = fde.getTupleIndex();
-
- // logging
- try {
- logExceptionCausingTuple(tupleIndex, e);
- } catch (Exception ex) {
- ex.addSuppressed(e);
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning("Unable to log exception causing tuple due to..." + ex.getMessage());
- }
- }
- // slicing
- return FeedFrameUtil.getSlicedFrame(ctx, tupleIndex, fta);
- } else {
- return null;
- }
- } catch (Exception exception) {
- exception.printStackTrace();
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning("Unable to handle exception " + exception.getMessage());
- }
- return null;
- }
- }
-
- private void logExceptionCausingTuple(int tupleIndex, Exception e) throws HyracksDataException, AsterixException {
-
- ByteBufferInputStream bbis = new ByteBufferInputStream();
- DataInputStream di = new DataInputStream(bbis);
-
- int start = fta.getTupleStartOffset(tupleIndex) + fta.getFieldSlotsLength();
- bbis.setByteBuffer(fta.getBuffer(), start);
-
- Object[] record = new Object[recordDesc.getFieldCount()];
-
- for (int i = 0; i < record.length; ++i) {
- Object instance = recordDesc.getFields()[i].deserialize(di);
- if (i == 0) {
- String tuple = String.valueOf(instance);
- feedManager.getFeedMetadataManager().logTuple(connectionId, tuple, e.getMessage(), feedManager);
- } else {
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning(", " + String.valueOf(instance));
- }
- }
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedFrameCache.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedFrameCache.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedFrameCache.java
deleted file mode 100644
index 55a7fb8..0000000
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedFrameCache.java
+++ /dev/null
@@ -1,171 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.feeds;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import org.apache.asterix.common.feeds.FeedConstants.StatisticsConstants;
-import org.apache.hyracks.api.comm.IFrame;
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.comm.VSizeFrame;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-
-/**
- * Allows caching of feed frames. This class is used in providing upstream backup.
- * The tuples at the intake layer are held in this cache until these are acked by
- * the storage layer post their persistence. On receiving an ack, appropriate tuples
- * (recordsId < ackedRecordId) are dropped from the cache.
- */
-public class FeedFrameCache extends MessageReceiver<ByteBuffer> {
-
- /**
- * Value represents a cache feed frame
- * Key represents the largest record Id in the frame.
- * At the intake side, the largest record id corresponds to the last record in the frame
- **/
- private final Map<Integer, ByteBuffer> orderedCache;
- private final FrameTupleAccessor tupleAccessor;
- private final IFrameWriter frameWriter;
- private final IHyracksTaskContext ctx;
-
- public FeedFrameCache(IHyracksTaskContext ctx, FrameTupleAccessor tupleAccessor, IFrameWriter frameWriter) {
- this.tupleAccessor = tupleAccessor;
- this.frameWriter = frameWriter;
- /** A LinkedHashMap ensures entries are retrieved in order of their insertion **/
- this.orderedCache = new LinkedHashMap<Integer, ByteBuffer>();
- this.ctx = ctx;
- }
-
- @Override
- public void processMessage(ByteBuffer frame) throws Exception {
- int lastRecordId = getLastRecordId(frame);
- ByteBuffer clone = cloneFrame(frame);
- orderedCache.put(lastRecordId, clone);
- }
-
- public void dropTillRecordId(int recordId) {
- List<Integer> dropRecordIds = new ArrayList<Integer>();
- for (Entry<Integer, ByteBuffer> entry : orderedCache.entrySet()) {
- int recId = entry.getKey();
- if (recId <= recordId) {
- dropRecordIds.add(recId);
- } else {
- break;
- }
- }
- for (Integer r : dropRecordIds) {
- orderedCache.remove(r);
- }
- }
-
- public void replayRecords(int startingRecordId) throws HyracksDataException {
- boolean replayPositionReached = false;
- for (Entry<Integer, ByteBuffer> entry : orderedCache.entrySet()) {
- // the key increases monotonically
- int maxRecordIdInFrame = entry.getKey();
- if (!replayPositionReached) {
- if (startingRecordId < maxRecordIdInFrame) {
- replayFrame(startingRecordId, entry.getValue());
- break;
- } else {
- continue;
- }
- }
- }
- }
-
- /**
- * Replay the frame from the tuple (inclusive) with recordId as specified.
- *
- * @param recordId
- * @param frame
- * @throws HyracksDataException
- */
- private void replayFrame(int recordId, ByteBuffer frame) throws HyracksDataException {
- tupleAccessor.reset(frame);
- int nTuples = tupleAccessor.getTupleCount();
- for (int i = 0; i < nTuples; i++) {
- int rid = getRecordIdAtTupleIndex(i, frame);
- if (rid == recordId) {
- ByteBuffer slicedFrame = splitFrame(i, frame);
- replayFrame(slicedFrame);
- break;
- }
- }
- }
-
- private ByteBuffer splitFrame(int beginTupleIndex, ByteBuffer frame) throws HyracksDataException {
- IFrame slicedFrame = new VSizeFrame(ctx);
- FrameTupleAppender appender = new FrameTupleAppender();
- appender.reset(slicedFrame, true);
- int totalTuples = tupleAccessor.getTupleCount();
- for (int ti = beginTupleIndex; ti < totalTuples; ti++) {
- appender.append(tupleAccessor, ti);
- }
- return slicedFrame.getBuffer();
- }
-
- /**
- * Replay the frame
- *
- * @param frame
- * @throws HyracksDataException
- */
- private void replayFrame(ByteBuffer frame) throws HyracksDataException {
- frameWriter.nextFrame(frame);
- }
-
- private int getLastRecordId(ByteBuffer frame) {
- tupleAccessor.reset(frame);
- int nTuples = tupleAccessor.getTupleCount();
- return getRecordIdAtTupleIndex(nTuples - 1, frame);
- }
-
- private int getRecordIdAtTupleIndex(int tupleIndex, ByteBuffer frame) {
- tupleAccessor.reset(frame);
- int recordStart = tupleAccessor.getTupleStartOffset(tupleIndex) + tupleAccessor.getFieldSlotsLength();
- int openPartOffset = frame.getInt(recordStart + 6);
- int numOpenFields = frame.getInt(recordStart + openPartOffset);
- int recordIdOffset = frame.getInt(recordStart + openPartOffset + 4 + numOpenFields * 8
- + StatisticsConstants.INTAKE_TUPLEID.length() + 2 + 1);
- int lastRecordId = frame.getInt(recordStart + recordIdOffset);
- return lastRecordId;
- }
-
- private ByteBuffer cloneFrame(ByteBuffer frame) {
- ByteBuffer clone = ByteBuffer.allocate(frame.capacity());
- System.arraycopy(frame.array(), 0, clone.array(), 0, frame.limit());
- return clone;
- }
-
- public void replayAll() throws HyracksDataException {
- for (Entry<Integer, ByteBuffer> entry : orderedCache.entrySet()) {
- ByteBuffer frame = entry.getValue();
- frameWriter.nextFrame(frame);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedFrameCollector.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedFrameCollector.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedFrameCollector.java
deleted file mode 100644
index a8c0e8f..0000000
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedFrameCollector.java
+++ /dev/null
@@ -1,158 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.feeds;
-
-import java.nio.ByteBuffer;
-import java.util.logging.Level;
-
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.common.feeds.api.IMessageReceiver;
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-public class FeedFrameCollector extends MessageReceiver<DataBucket> implements IMessageReceiver<DataBucket> {
-
- private final FeedConnectionId connectionId;
- private final FrameDistributor frameDistributor;
- private FeedPolicyAccessor fpa;
- private IFrameWriter frameWriter;
- private State state;
-
- public enum State {
- ACTIVE,
- FINISHED,
- TRANSITION,
- HANDOVER
- }
-
- public FeedFrameCollector(FrameDistributor frameDistributor, FeedPolicyAccessor feedPolicyAccessor,
- IFrameWriter frameWriter, FeedConnectionId connectionId) {
- super();
- this.frameDistributor = frameDistributor;
- this.fpa = feedPolicyAccessor;
- this.connectionId = connectionId;
- this.frameWriter = frameWriter;
- this.state = State.ACTIVE;
- }
-
- @Override
- public void processMessage(DataBucket bucket) throws Exception {
- try {
- ByteBuffer frame = bucket.getContent();
- switch (bucket.getContentType()) {
- case DATA:
- frameWriter.nextFrame(frame);
- break;
- case EOD:
- closeCollector();
- break;
- case EOSD:
- throw new AsterixException("Received data bucket with content of type " + bucket.getContentType());
- }
- } catch (Exception e) {
- e.printStackTrace();
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning("Unable to process data bucket " + bucket + ", encountered exception " + e.getMessage());
- }
- } finally {
- bucket.doneReading();
- }
- }
-
- public void closeCollector() {
- if (state.equals(State.TRANSITION)) {
- super.close(true);
- setState(State.ACTIVE);
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info(this + " is now " + State.ACTIVE + " mode, processing frames synchronously");
- }
- } else {
- flushPendingMessages();
- setState(State.FINISHED);
- synchronized (frameDistributor.getRegisteredCollectors()) {
- frameDistributor.getRegisteredCollectors().notifyAll();
- }
- disconnect();
- }
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Closed collector " + this);
- }
- }
-
- public synchronized void disconnect() {
- setState(State.FINISHED);
- }
-
- public synchronized void nextFrame(ByteBuffer frame) throws HyracksDataException {
- frameWriter.nextFrame(frame);
- }
-
- public FeedPolicyAccessor getFeedPolicyAccessor() {
- return fpa;
- }
-
- public synchronized State getState() {
- return state;
- }
-
- public synchronized void setState(State state) {
- this.state = state;
- switch (state) {
- case FINISHED:
- case HANDOVER:
- notifyAll();
- break;
- default:
- break;
- }
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Frame Collector " + this.frameDistributor.getFeedRuntimeType() + " switched to " + state);
- }
- }
-
- public IFrameWriter getFrameWriter() {
- return frameWriter;
- }
-
- public void setFrameWriter(IFrameWriter frameWriter) {
- this.frameWriter = frameWriter;
- }
-
- @Override
- public String toString() {
- return "FrameCollector " + connectionId + "," + state + "]";
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o instanceof FeedFrameCollector) {
- return connectionId.equals(((FeedFrameCollector) o).connectionId);
- }
- return false;
- }
-
- @Override
- public int hashCode() {
- return connectionId.toString().hashCode();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedFrameDiscarder.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedFrameDiscarder.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedFrameDiscarder.java
deleted file mode 100644
index 8609366..0000000
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedFrameDiscarder.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.feeds;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-public class FeedFrameDiscarder {
-
- private static final Logger LOGGER = Logger.getLogger(FeedFrameSpiller.class.getName());
-
- private final FeedRuntimeInputHandler inputHandler;
- private final FeedConnectionId connectionId;
- private final FeedRuntimeId runtimeId;
- private final FeedPolicyAccessor policyAccessor;
- private final float maxFractionDiscard;
- private int nDiscarded;
-
- public FeedFrameDiscarder(FeedConnectionId connectionId, FeedRuntimeId runtimeId, FeedPolicyAccessor policyAccessor,
- FeedRuntimeInputHandler inputHandler) throws IOException {
- this.connectionId = connectionId;
- this.runtimeId = runtimeId;
- this.policyAccessor = policyAccessor;
- this.inputHandler = inputHandler;
- this.maxFractionDiscard = policyAccessor.getMaxFractionDiscard();
- }
-
- public boolean processMessage(ByteBuffer message) {
- if (policyAccessor.getMaxFractionDiscard() != 0) {
- long nProcessed = inputHandler.getProcessed();
- long discardLimit = (long) (nProcessed * maxFractionDiscard);
- if (nDiscarded >= discardLimit) {
- return false;
- }
- nDiscarded++;
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning("Discarded frame by " + connectionId + " (" + runtimeId + ")" + " count so far ("
- + nDiscarded + ") Limit [" + discardLimit + "]");
- }
- return true;
- }
- return false;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedFrameHandlers.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedFrameHandlers.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedFrameHandlers.java
deleted file mode 100644
index c4a2ce0..0000000
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedFrameHandlers.java
+++ /dev/null
@@ -1,303 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.feeds;
-
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Collection;
-import java.util.Date;
-import java.util.Iterator;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.common.feeds.api.IFeedFrameHandler;
-import org.apache.asterix.common.feeds.api.IFeedRuntime.FeedRuntimeType;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-
-public class FeedFrameHandlers {
-
- private static final Logger LOGGER = Logger.getLogger(FeedFrameHandlers.class.getName());
-
- public enum RoutingMode {
- IN_MEMORY_ROUTE,
- SPILL_TO_DISK,
- DISCARD
- }
-
- public static IFeedFrameHandler getFeedFrameHandler(FrameDistributor distributor, FeedId feedId,
- RoutingMode routingMode, FeedRuntimeType runtimeType, int partition, int frameSize) throws IOException {
- IFeedFrameHandler handler = null;
- switch (routingMode) {
- case IN_MEMORY_ROUTE:
- handler = new InMemoryRouter(distributor.getRegisteredReaders().values(), runtimeType, partition);
- break;
- case SPILL_TO_DISK:
- handler = new DiskSpiller(distributor, feedId, runtimeType, partition, frameSize);
- break;
- case DISCARD:
- handler = new DiscardRouter(distributor, feedId, runtimeType, partition);
- break;
- default:
- throw new IllegalArgumentException("Invalid routing mode" + routingMode);
- }
- return handler;
- }
-
- public static class DiscardRouter implements IFeedFrameHandler {
-
- private final FeedId feedId;
- private int nDiscarded;
- private final FeedRuntimeType runtimeType;
- private final int partition;
- private final FrameDistributor distributor;
-
- public DiscardRouter(FrameDistributor distributor, FeedId feedId, FeedRuntimeType runtimeType, int partition)
- throws HyracksDataException {
- this.distributor = distributor;
- this.feedId = feedId;
- this.nDiscarded = 0;
- this.runtimeType = runtimeType;
- this.partition = partition;
- }
-
- @Override
- public void handleFrame(ByteBuffer frame) throws HyracksDataException {
- FrameTupleAccessor fta = distributor.getFta();
- fta.reset(frame);
- int nTuples = fta.getTupleCount();
- nDiscarded += nTuples;
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning("Discarded additional [" + runtimeType + "]" + "(" + partition + ")" + " " + nTuples);
- }
- }
-
- @Override
- public void handleDataBucket(DataBucket bucket) {
- nDiscarded++;
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning("Discard Count" + nDiscarded);
- }
- }
-
- @Override
- public void close() {
- // do nothing, no resource to relinquish
- }
-
- @Override
- public Iterator<ByteBuffer> replayData() throws HyracksDataException {
- throw new IllegalStateException("Invalid operation");
- }
-
- @Override
- public String toString() {
- return "DiscardRouter" + "[" + feedId + "]" + "(" + nDiscarded + ")";
- }
-
- @Override
- public String getSummary() {
- return new String("Number of discarded frames (since last reset)" + " feedId " + "[" + feedId + "]" + "("
- + nDiscarded + ")");
- }
-
- }
-
- public static class InMemoryRouter implements IFeedFrameHandler {
-
- private final Collection<FeedFrameCollector> frameCollectors;
-
- public InMemoryRouter(Collection<FeedFrameCollector> frameCollectors, FeedRuntimeType runtimeType,
- int partition) {
- this.frameCollectors = frameCollectors;
- }
-
- @Override
- public void handleFrame(ByteBuffer frame) throws HyracksDataException {
- throw new IllegalStateException("Operation not supported");
- }
-
- @Override
- public void handleDataBucket(DataBucket bucket) {
- for (FeedFrameCollector collector : frameCollectors) {
- collector.sendMessage(bucket); // asynchronous call
- }
- }
-
- @Override
- public void close() {
- // do nothing
- }
-
- @Override
- public Iterator<ByteBuffer> replayData() throws HyracksDataException {
- throw new IllegalStateException("Operation not supported");
- }
-
- @Override
- public String getSummary() {
- return "InMemoryRouter Summary";
- }
- }
-
- public static class DiskSpiller implements IFeedFrameHandler {
-
- private FrameSpiller<ByteBuffer> receiver;
- private Iterator<ByteBuffer> iterator;
-
- public DiskSpiller(FrameDistributor distributor, FeedId feedId, FeedRuntimeType runtimeType, int partition,
- int frameSize) throws IOException {
- receiver = new FrameSpiller<ByteBuffer>(distributor, feedId, frameSize);
- }
-
- @Override
- public void handleFrame(ByteBuffer frame) throws HyracksDataException {
- receiver.sendMessage(frame);
- }
-
- private static class FrameSpiller<T> extends MessageReceiver<ByteBuffer> {
-
- private final FeedId feedId;
- private BufferedOutputStream bos;
- private final ByteBuffer reusableLengthBuffer;
- private final ByteBuffer reusableDataBuffer;
- private long offset;
- private File file;
- private final FrameDistributor frameDistributor;
- private boolean fileCreated = false;
-
- public FrameSpiller(FrameDistributor distributor, FeedId feedId, int frameSize) throws IOException {
- this.feedId = feedId;
- this.frameDistributor = distributor;
- reusableLengthBuffer = ByteBuffer.allocate(4);
- reusableDataBuffer = ByteBuffer.allocate(frameSize);
- this.offset = 0;
- }
-
- @Override
- public void processMessage(ByteBuffer message) throws Exception {
- if (!fileCreated) {
- createFile();
- fileCreated = true;
- }
- reusableLengthBuffer.flip();
- reusableLengthBuffer.putInt(message.array().length);
- bos.write(reusableLengthBuffer.array());
- bos.write(message.array());
- }
-
- private void createFile() throws IOException {
- Date date = new Date();
- String dateSuffix = date.toString().replace(' ', '_');
- String fileName = feedId.toString() + "_" + frameDistributor.getFeedRuntimeType() + "_"
- + frameDistributor.getPartition() + "_" + dateSuffix;
-
- file = new File(fileName);
- if (!file.exists()) {
- boolean success = file.createNewFile();
- if (!success) {
- throw new IOException("Unable to create spill file for feed " + feedId);
- }
- }
- bos = new BufferedOutputStream(new FileOutputStream(file));
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Created Spill File for feed " + feedId);
- }
- }
-
- @SuppressWarnings("resource")
- public Iterator<ByteBuffer> replayData() throws Exception {
- final BufferedInputStream bis = new BufferedInputStream(new FileInputStream(file));
- bis.skip(offset);
- return new Iterator<ByteBuffer>() {
-
- @Override
- public boolean hasNext() {
- boolean more = false;
- try {
- more = bis.available() > 0;
- if (!more) {
- bis.close();
- }
- } catch (IOException e) {
- e.printStackTrace();
- }
-
- return more;
- }
-
- @Override
- public ByteBuffer next() {
- reusableLengthBuffer.flip();
- try {
- bis.read(reusableLengthBuffer.array());
- reusableLengthBuffer.flip();
- int frameSize = reusableLengthBuffer.getInt();
- reusableDataBuffer.flip();
- bis.read(reusableDataBuffer.array(), 0, frameSize);
- offset += 4 + frameSize;
- } catch (IOException e) {
- e.printStackTrace();
- }
- return reusableDataBuffer;
- }
-
- @Override
- public void remove() {
- }
-
- };
- }
-
- }
-
- @Override
- public void handleDataBucket(DataBucket bucket) {
- throw new IllegalStateException("Operation not supported");
- }
-
- @Override
- public void close() {
- receiver.close(true);
- }
-
- @Override
- public Iterator<ByteBuffer> replayData() throws HyracksDataException {
- try {
- iterator = receiver.replayData();
- } catch (Exception e) {
- throw new HyracksDataException(e);
- }
- return iterator;
- }
-
- //TODO: Form a summary that includes stats related to what has been spilled to disk
- @Override
- public String getSummary() {
- return "Disk Spiller Summary";
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedFrameSpiller.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedFrameSpiller.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedFrameSpiller.java
deleted file mode 100644
index 86187b8..0000000
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedFrameSpiller.java
+++ /dev/null
@@ -1,176 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.feeds;
-
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.Date;
-import java.util.Iterator;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.hyracks.api.comm.IFrame;
-import org.apache.hyracks.api.comm.VSizeFrame;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-
-public class FeedFrameSpiller {
-
- private static final Logger LOGGER = Logger.getLogger(FeedFrameSpiller.class.getName());
-
- private final IHyracksTaskContext ctx;
- private final FeedConnectionId connectionId;
- private final FeedRuntimeId runtimeId;
- private final FeedPolicyAccessor policyAccessor;
- private BufferedOutputStream bos;
- private File file;
- private boolean fileCreated = false;
- private long bytesWritten = 0;
- private int spilledFrameCount = 0;
-
- public FeedFrameSpiller(IHyracksTaskContext ctx, FeedConnectionId connectionId, FeedRuntimeId runtimeId,
- FeedPolicyAccessor policyAccessor) throws IOException {
- this.ctx = ctx;
- this.connectionId = connectionId;
- this.runtimeId = runtimeId;
- this.policyAccessor = policyAccessor;
- }
-
- public boolean processMessage(ByteBuffer message) throws Exception {
- if (!fileCreated) {
- createFile();
- fileCreated = true;
- }
- long maxAllowed = policyAccessor.getMaxSpillOnDisk();
- if (maxAllowed != FeedPolicyAccessor.NO_LIMIT && bytesWritten + message.array().length > maxAllowed) {
- return false;
- } else {
- bos.write(message.array());
- bytesWritten += message.array().length;
- spilledFrameCount++;
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning("Spilled frame by " + runtimeId + " spill count " + spilledFrameCount);
- }
- return true;
- }
- }
-
- private void createFile() throws IOException {
- Date date = new Date();
- String dateSuffix = date.toString().replace(' ', '_');
- String fileName = connectionId.getFeedId() + "_" + connectionId.getDatasetName() + "_"
- + runtimeId.getFeedRuntimeType() + "_" + runtimeId.getPartition() + "_" + dateSuffix;
-
- file = new File(fileName);
- if (!file.exists()) {
- boolean success = file.createNewFile();
- if (!success) {
- throw new IOException("Unable to create spill file " + fileName + " for feed " + runtimeId);
- } else {
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Created spill file " + file.getAbsolutePath());
- }
- }
- }
- bos = new BufferedOutputStream(new FileOutputStream(file));
-
- }
-
- public Iterator<ByteBuffer> replayData() throws Exception {
- bos.flush();
- return new FrameIterator(ctx, file.getName());
- }
-
- private static class FrameIterator implements Iterator<ByteBuffer> {
-
- private final BufferedInputStream bis;
- private final IHyracksTaskContext ctx;
- private int readFrameCount = 0;
-
- public FrameIterator(IHyracksTaskContext ctx, String filename) throws FileNotFoundException {
- bis = new BufferedInputStream(new FileInputStream(new File(filename)));
- this.ctx = ctx;
- }
-
- @Override
- public boolean hasNext() {
- boolean more = false;
- try {
- more = bis.available() > 0;
- if (!more) {
- bis.close();
- }
- } catch (IOException e) {
- e.printStackTrace();
- }
-
- return more;
- }
-
- @Override
- public ByteBuffer next() {
- IFrame frame = null;
- try {
- frame = new VSizeFrame(ctx);
- Arrays.fill(frame.getBuffer().array(), (byte) 0);
- bis.read(frame.getBuffer().array(), 0, frame.getFrameSize());
- readFrameCount++;
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Read spill frome " + readFrameCount);
- }
- } catch (IOException e) {
- e.printStackTrace();
- }
- return frame.getBuffer();
- }
-
- @Override
- public void remove() {
- }
-
- }
-
- public void reset() {
- bytesWritten = 0;
- // file.delete();
- fileCreated = false;
- bos = null;
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Resetted the FrameSpiller!");
- }
- }
-
- public void close() {
- if (bos != null) {
- try {
- bos.flush();
- bos.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedFrameTupleAccessor.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedFrameTupleAccessor.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedFrameTupleAccessor.java
deleted file mode 100644
index 9645bf9..0000000
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedFrameTupleAccessor.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.feeds;
-
-import java.nio.ByteBuffer;
-
-import org.apache.asterix.common.feeds.FeedConstants.StatisticsConstants;
-import org.apache.hyracks.api.comm.IFrameTupleAccessor;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-
-public class FeedFrameTupleAccessor implements IFrameTupleAccessor {
-
- private final FrameTupleAccessor frameAccessor;
- private final int numOpenFields;
-
- public FeedFrameTupleAccessor(FrameTupleAccessor frameAccessor) {
- this.frameAccessor = frameAccessor;
- int firstRecordStart = frameAccessor.getTupleStartOffset(0) + frameAccessor.getFieldSlotsLength();
- int openPartOffsetOrig = frameAccessor.getBuffer().getInt(firstRecordStart + 6);
- numOpenFields = frameAccessor.getBuffer().getInt(firstRecordStart + openPartOffsetOrig);
- }
-
- public int getFeedIntakePartition(int tupleIndex) {
- ByteBuffer buffer = frameAccessor.getBuffer();
- int recordStart = frameAccessor.getTupleStartOffset(tupleIndex) + frameAccessor.getFieldSlotsLength();
- int openPartOffsetOrig = buffer.getInt(recordStart + 6);
- int partitionOffset = openPartOffsetOrig + 4 + 8 * numOpenFields
- + StatisticsConstants.INTAKE_PARTITION.length() + 2 + 1;
- return buffer.getInt(recordStart + partitionOffset);
- }
-
-
-
- @Override
- public int getFieldCount() {
- return frameAccessor.getFieldCount();
- }
-
- @Override
- public int getFieldSlotsLength() {
- return frameAccessor.getFieldSlotsLength();
- }
-
- @Override
- public int getFieldEndOffset(int tupleIndex, int fIdx) {
- return frameAccessor.getFieldEndOffset(tupleIndex, fIdx);
- }
-
- @Override
- public int getFieldStartOffset(int tupleIndex, int fIdx) {
- return frameAccessor.getFieldStartOffset(tupleIndex, fIdx);
- }
-
- @Override
- public int getFieldLength(int tupleIndex, int fIdx) {
- return frameAccessor.getFieldLength(tupleIndex, fIdx);
- }
-
- @Override
- public int getTupleEndOffset(int tupleIndex) {
- return frameAccessor.getTupleEndOffset(tupleIndex);
- }
-
- @Override
- public int getTupleStartOffset(int tupleIndex) {
- return frameAccessor.getTupleStartOffset(tupleIndex);
- }
-
- @Override
- public int getTupleCount() {
- return frameAccessor.getTupleCount();
- }
-
- @Override
- public ByteBuffer getBuffer() {
- return frameAccessor.getBuffer();
- }
-
- @Override
- public void reset(ByteBuffer buffer) {
- frameAccessor.reset(buffer);
- }
-
- @Override
- public int getAbsoluteFieldStartOffset(int tupleIndex, int fIdx) {
- return frameAccessor.getAbsoluteFieldStartOffset(tupleIndex, fIdx);
- }
-
- @Override
- public int getTupleLength(int tupleIndex) {
- return frameAccessor.getTupleLength(tupleIndex);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedFrameUtil.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedFrameUtil.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedFrameUtil.java
deleted file mode 100644
index baf7a7c..0000000
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedFrameUtil.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.feeds;
-
-import java.nio.ByteBuffer;
-import java.util.BitSet;
-import java.util.Random;
-
-import org.apache.hyracks.api.comm.IFrame;
-import org.apache.hyracks.api.comm.VSizeFrame;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-
-public class FeedFrameUtil {
-
- public static ByteBuffer getSlicedFrame(IHyracksTaskContext ctx, int tupleIndex, FrameTupleAccessor fta) throws HyracksDataException {
- FrameTupleAppender appender = new FrameTupleAppender();
- IFrame slicedFrame = new VSizeFrame(ctx);
- appender.reset(slicedFrame, true);
- int startTupleIndex = tupleIndex + 1;
- int totalTuples = fta.getTupleCount();
- for (int ti = startTupleIndex; ti < totalTuples; ti++) {
- appender.append(fta, ti);
- }
- return slicedFrame.getBuffer();
- }
-
- public static ByteBuffer getSampledFrame(IHyracksTaskContext ctx, FrameTupleAccessor fta, int sampleSize) throws HyracksDataException {
- NChooseKIterator it = new NChooseKIterator(fta.getTupleCount(), sampleSize);
- FrameTupleAppender appender = new FrameTupleAppender();
- IFrame sampledFrame = new VSizeFrame(ctx);
- appender.reset(sampledFrame, true);
- int nextTupleIndex = 0;
- while (it.hasNext()) {
- nextTupleIndex = it.next();
- appender.append(fta, nextTupleIndex);
- }
- return sampledFrame.getBuffer();
- }
-
-
-
- private static class NChooseKIterator {
-
- private final int n;
- private final int k;
- private final BitSet bSet;
- private final Random random;
-
- private int traversed = 0;
-
- public NChooseKIterator(int n, int k) {
- this.n = n;
- this.k = k;
- this.bSet = new BitSet(n);
- bSet.set(0, n - 1);
- this.random = new Random();
- }
-
- public boolean hasNext() {
- return traversed < k;
- }
-
- public int next() {
- if (hasNext()) {
- traversed++;
- int startOffset = random.nextInt(n);
- int pos = -1;
- while (pos < 0) {
- pos = bSet.nextSetBit(startOffset);
- if (pos < 0) {
- startOffset = 0;
- }
- }
- bSet.clear(pos);
- return pos;
- } else {
- return -1;
- }
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedId.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedId.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedId.java
deleted file mode 100644
index 81b7c4e..0000000
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedId.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.feeds;
-
-import java.io.Serializable;
-
-/**
- * A unique identifier for a data feed.
- */
-public class FeedId implements Serializable {
-
- private static final long serialVersionUID = 1L;
-
- private final String dataverse;
- private final String feedName;
-
- public FeedId(String dataverse, String feedName) {
- this.dataverse = dataverse;
- this.feedName = feedName;
- }
-
- public String getDataverse() {
- return dataverse;
- }
-
- public String getFeedName() {
- return feedName;
- }
-
- @Override
- public boolean equals(Object o) {
- if (o == null || !(o instanceof FeedId)) {
- return false;
- }
- if (this == o || ((FeedId) o).getFeedName().equals(feedName) && ((FeedId) o).getDataverse().equals(dataverse)) {
- return true;
- }
- return false;
- }
-
- @Override
- public int hashCode() {
- return toString().hashCode();
- }
-
- @Override
- public String toString() {
- return dataverse + "." + feedName;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedIntakeInfo.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedIntakeInfo.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedIntakeInfo.java
deleted file mode 100644
index 0382b23..0000000
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedIntakeInfo.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.feeds;
-
-import java.util.List;
-
-import org.apache.asterix.common.feeds.api.IFeedJoint;
-import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.api.job.JobSpecification;
-
-public class FeedIntakeInfo extends FeedJobInfo {
-
- private final FeedId feedId;
- private final IFeedJoint intakeFeedJoint;
- private final JobSpecification spec;
- private List<String> intakeLocation;
-
- public FeedIntakeInfo(JobId jobId, FeedJobState state, JobType jobType, FeedId feedId, IFeedJoint intakeFeedJoint,
- JobSpecification spec) {
- super(jobId, state, FeedJobInfo.JobType.INTAKE, spec);
- this.feedId = feedId;
- this.intakeFeedJoint = intakeFeedJoint;
- this.spec = spec;
- }
-
- public FeedId getFeedId() {
- return feedId;
- }
-
- public IFeedJoint getIntakeFeedJoint() {
- return intakeFeedJoint;
- }
-
- public JobSpecification getSpec() {
- return spec;
- }
-
- public List<String> getIntakeLocation() {
- return intakeLocation;
- }
-
- public void setIntakeLocation(List<String> intakeLocation) {
- this.intakeLocation = intakeLocation;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedJobInfo.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedJobInfo.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedJobInfo.java
deleted file mode 100644
index 2db9955..0000000
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedJobInfo.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.feeds;
-
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.api.job.JobSpecification;
-
-public class FeedJobInfo {
-
- private static final Logger LOGGER = Logger.getLogger(FeedJobInfo.class.getName());
-
- public enum JobType {
- INTAKE,
- FEED_CONNECT
- }
-
- public enum FeedJobState {
- CREATED,
- ACTIVE,
- UNDER_RECOVERY,
- ENDED
- }
-
- protected final JobId jobId;
- protected final JobType jobType;
- protected FeedJobState state;
- protected JobSpecification spec;
-
- public FeedJobInfo(JobId jobId, FeedJobState state, JobType jobType, JobSpecification spec) {
- this.jobId = jobId;
- this.state = state;
- this.jobType = jobType;
- this.spec = spec;
- }
-
- public JobId getJobId() {
- return jobId;
- }
-
- public FeedJobState getState() {
- return state;
- }
-
- public void setState(FeedJobState state) {
- this.state = state;
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info(this + " is in " + state + " state.");
- }
- }
-
- public JobType getJobType() {
- return jobType;
- }
-
- public JobSpecification getSpec() {
- return spec;
- }
-
- public void setSpec(JobSpecification spec) {
- this.spec = spec;
- }
-
- public String toString() {
- return jobId + " [" + jobType + "]";
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedJointKey.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedJointKey.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedJointKey.java
deleted file mode 100644
index 8005967..0000000
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedJointKey.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.feeds;
-
-import java.util.List;
-
-import org.apache.commons.lang3.StringUtils;
-
-/**
- * Represents a unique identifier for a Feed Joint. A Feed joint is a logical entity located
- * along a feed ingestion pipeline at a point where the tuples moving as part of data flow
- * constitute the feed. The feed joint acts as a network tap and allows the flowing data to be
- * routed to multiple paths.
- */
-public class FeedJointKey {
-
- private final FeedId primaryFeedId;
- private final List<String> appliedFunctions;
- private final String stringRep;
-
- public FeedJointKey(FeedId feedId, List<String> appliedFunctions) {
- this.primaryFeedId = feedId;
- this.appliedFunctions = appliedFunctions;
- StringBuilder builder = new StringBuilder();
- builder.append(feedId);
- builder.append(":");
- builder.append(StringUtils.join(appliedFunctions, ':'));
- stringRep = builder.toString();
- }
-
- public FeedId getFeedId() {
- return primaryFeedId;
- }
-
- public List<String> getAppliedFunctions() {
- return appliedFunctions;
- }
-
- public String getStringRep() {
- return stringRep;
- }
-
- @Override
- public final String toString() {
- return stringRep;
- }
-
- @Override
- public int hashCode() {
- return stringRep.hashCode();
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o)
- return true;
- if (o == null || !(o instanceof FeedJointKey)) {
- return false;
- }
- return stringRep.equals(((FeedJointKey) o).stringRep);
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedMemoryManager.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedMemoryManager.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedMemoryManager.java
deleted file mode 100644
index c39d82a..0000000
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedMemoryManager.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.feeds;
-
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.common.config.AsterixFeedProperties;
-import org.apache.asterix.common.feeds.api.IFeedMemoryComponent;
-import org.apache.asterix.common.feeds.api.IFeedMemoryComponent.Type;
-import org.apache.asterix.common.feeds.api.IFeedMemoryManager;
-
-public class FeedMemoryManager implements IFeedMemoryManager {
-
- private static final Logger LOGGER = Logger.getLogger(FeedMemoryManager.class.getName());
- private static final int ALLOCATION_INCREMENT = 10;
-
- private final AtomicInteger componentId = new AtomicInteger(0);
- private final String nodeId;
- private final int budget;
- private final int frameSize;
-
- private int committed;
-
- public FeedMemoryManager(String nodeId, AsterixFeedProperties feedProperties, int frameSize) {
- this.nodeId = nodeId;
- this.frameSize = frameSize;
- budget = (int) feedProperties.getMemoryComponentGlobalBudget() / frameSize;
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Feed Memory budget " + budget + " frames (frame size=" + frameSize + ")");
- }
- }
-
- @Override
- public synchronized IFeedMemoryComponent getMemoryComponent(Type type) {
- IFeedMemoryComponent memoryComponent = null;
- boolean valid = false;
- switch (type) {
- case COLLECTION:
- valid = committed + START_COLLECTION_SIZE <= budget;
- if (valid) {
- memoryComponent = new FrameCollection(componentId.incrementAndGet(), this, START_COLLECTION_SIZE);
- }
- break;
- case POOL:
- valid = committed + START_POOL_SIZE <= budget;
- if (valid) {
- memoryComponent = new DataBucketPool(componentId.incrementAndGet(), this, START_POOL_SIZE,
- frameSize);
- }
- committed += START_POOL_SIZE;
- break;
- }
- if (!valid) {
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning("Unable to allocate memory component of type" + type);
- }
- }
- return valid ? memoryComponent : null;
- }
-
- @Override
- public synchronized boolean expandMemoryComponent(IFeedMemoryComponent memoryComponent) {
- if (committed + ALLOCATION_INCREMENT > budget) {
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning("Memory budget " + budget + " is exhausted. Space left: " + (budget - committed)
- + " frames.");
- }
- return false;
- } else {
- memoryComponent.expand(ALLOCATION_INCREMENT);
- committed += ALLOCATION_INCREMENT;
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Expanded memory component " + memoryComponent + " by " + ALLOCATION_INCREMENT + " " + this);
- }
- return true;
- }
- }
-
- @Override
- public synchronized void releaseMemoryComponent(IFeedMemoryComponent memoryComponent) {
- int delta = memoryComponent.getTotalAllocation();
- committed -= delta;
- memoryComponent.reset();
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Reset " + memoryComponent + " and reclaimed " + delta + " frames " + this);
- }
- }
-
- @Override
- public String toString() {
- return "FeedMemoryManager [" + nodeId + "]" + "(" + committed + "/" + budget + ")";
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedMessageService.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedMessageService.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedMessageService.java
deleted file mode 100644
index 9582602..0000000
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedMessageService.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.feeds;
-
-import java.net.Socket;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.json.JSONException;
-import org.json.JSONObject;
-
-import org.apache.asterix.common.config.AsterixFeedProperties;
-import org.apache.asterix.common.feeds.api.IFeedMessage;
-import org.apache.asterix.common.feeds.api.IFeedMessageService;
-
-/**
- * Sends feed report messages on behalf of an operator instance
- * to the SuperFeedManager associated with the feed.
- */
-public class FeedMessageService implements IFeedMessageService {
-
- private static final Logger LOGGER = Logger.getLogger(FeedMessageService.class.getName());
-
- private final LinkedBlockingQueue<String> inbox;
- private final FeedMessageHandler mesgHandler;
- private final String nodeId;
- private ExecutorService executor;
-
- public FeedMessageService(AsterixFeedProperties feedProperties, String nodeId, String ccClusterIp) {
- this.inbox = new LinkedBlockingQueue<String>();
- this.mesgHandler = new FeedMessageHandler(inbox, ccClusterIp, feedProperties.getFeedCentralManagerPort());
- this.nodeId = nodeId;
- this.executor = Executors.newSingleThreadExecutor();
- }
-
- public void start() throws Exception {
-
- executor.execute(mesgHandler);
- }
-
- public void stop() {
- synchronized (mesgHandler.getLock()) {
- executor.shutdownNow();
- }
- mesgHandler.stop();
- }
-
- @Override
- public void sendMessage(IFeedMessage message) {
- try {
- JSONObject obj = message.toJSON();
- obj.put(FeedConstants.MessageConstants.NODE_ID, nodeId);
- obj.put(FeedConstants.MessageConstants.MESSAGE_TYPE, message.getMessageType().name());
- inbox.add(obj.toString());
- } catch (JSONException jse) {
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning("JSON exception in parsing message " + message + " exception [" + jse.getMessage() + "]");
- }
- }
- }
-
- private static class FeedMessageHandler implements Runnable {
-
- private final LinkedBlockingQueue<String> inbox;
- private final String host;
- private final int port;
- private final Object lock;
-
- private Socket cfmSocket;
-
- private static final byte[] EOL = "\n".getBytes();
-
- public FeedMessageHandler(LinkedBlockingQueue<String> inbox, String host, int port) {
- this.inbox = inbox;
- this.host = host;
- this.port = port;
- this.lock = new Object();
- }
-
- public void run() {
- try {
- cfmSocket = new Socket(host, port);
- if (cfmSocket != null) {
- while (true) {
- String message = inbox.take();
- synchronized (lock) { // lock prevents message handler from sending incomplete message midst shutdown attempt
- cfmSocket.getOutputStream().write(message.getBytes());
- cfmSocket.getOutputStream().write(EOL);
- }
- }
- } else {
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning("Unable to start feed message service");
- }
- }
- } catch (Exception e) {
- e.printStackTrace();
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning("Exception in handling incoming feed messages" + e.getMessage());
- }
- } finally {
- stop();
- }
-
- }
-
- public void stop() {
- if (cfmSocket != null) {
- try {
- cfmSocket.close();
- } catch (Exception e) {
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning("Exception in closing socket " + e.getMessage());
- }
- }
- }
- }
-
- public Object getLock() {
- return lock;
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedMetricCollector.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedMetricCollector.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedMetricCollector.java
deleted file mode 100644
index 7b76692..0000000
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedMetricCollector.java
+++ /dev/null
@@ -1,187 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.feeds;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.common.feeds.api.IFeedMetricCollector;
-
-public class FeedMetricCollector implements IFeedMetricCollector {
-
- private static final Logger LOGGER = Logger.getLogger(FeedMetricCollector.class.getName());
-
- private static final int UNKNOWN = -1;
-
- private final AtomicInteger globalSenderId = new AtomicInteger(1);
- private final Map<Integer, Sender> senders = new HashMap<Integer, Sender>();
- private final Map<Integer, Series> statHistory = new HashMap<Integer, Series>();
- private final Map<String, Sender> sendersByName = new HashMap<String, Sender>();
-
- public FeedMetricCollector(String nodeId) {
- }
-
- @Override
- public synchronized int createReportSender(FeedConnectionId connectionId, FeedRuntimeId runtimeId,
- ValueType valueType, MetricType metricType) {
- Sender sender = new Sender(globalSenderId.getAndIncrement(), connectionId, runtimeId, valueType, metricType);
- senders.put(sender.senderId, sender);
- sendersByName.put(sender.getDisplayName(), sender);
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Sender id " + sender.getSenderId() + " created for " + sender);
- }
- return sender.senderId;
- }
-
- @Override
- public void removeReportSender(int senderId) {
- Sender sender = senders.get(senderId);
- if (sender != null) {
- statHistory.remove(senderId);
- senders.remove(senderId);
- } else {
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning("Unable to remove sender Id");
- }
- throw new IllegalStateException("Unable to remove sender Id " + senderId + " senders " + senders);
- }
- }
-
- @Override
- public boolean sendReport(int senderId, int value) {
- Sender sender = senders.get(senderId);
- if (sender != null) {
- Series series = statHistory.get(sender.senderId);
- if (series == null) {
- switch (sender.mType) {
- case AVG:
- series = new SeriesAvg();
- break;
- case RATE:
- series = new SeriesRate();
- break;
- }
- statHistory.put(sender.senderId, series);
- }
- series.addValue(value);
- return true;
- }
- throw new IllegalStateException("Unable to send report sender Id " + senderId + " senders " + senders);
- }
-
- @Override
- public void resetReportSender(int senderId) {
- Sender sender = senders.get(senderId);
- if (sender != null) {
- Series series = statHistory.get(sender.senderId);
- if (series != null) {
- series.reset();
- }
- } else {
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning("Sender with id " + senderId + " not found. Unable to reset!");
- }
- throw new IllegalStateException("Unable to reset sender Id " + senderId + " senders " + senders);
- }
- }
-
- private static class Sender {
-
- private final int senderId;
- private final MetricType mType;
- private final String displayName;
-
- public Sender(int senderId, FeedConnectionId connectionId, FeedRuntimeId runtimeId, ValueType valueType,
- MetricType mType) {
- this.senderId = senderId;
- this.mType = mType;
- this.displayName = createDisplayName(connectionId, runtimeId, valueType);
- }
-
- @Override
- public String toString() {
- return displayName + "[" + senderId + "]" + "(" + mType + ")";
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (!(o instanceof Sender)) {
- return false;
- }
- return ((Sender) o).senderId == senderId;
- }
-
- @Override
- public int hashCode() {
- return senderId;
- }
-
- public static String createDisplayName(FeedConnectionId connectionId, FeedRuntimeId runtimeId,
- ValueType valueType) {
- return connectionId + " (" + runtimeId.getFeedRuntimeType() + " )" + "[" + runtimeId.getPartition() + "]"
- + "{" + valueType + "}";
- }
-
- public String getDisplayName() {
- return displayName;
- }
-
- public int getSenderId() {
- return senderId;
- }
- }
-
- @Override
- public int getMetric(int senderId) {
- Sender sender = senders.get(senderId);
- return getMetric(sender);
- }
-
- @Override
- public int getMetric(FeedConnectionId connectionId, FeedRuntimeId runtimeId, ValueType valueType) {
- String displayName = Sender.createDisplayName(connectionId, runtimeId, valueType);
- Sender sender = sendersByName.get(displayName);
- return getMetric(sender);
- }
-
- private int getMetric(Sender sender) {
- if (sender == null || statHistory.get(sender.getSenderId()) == null) {
- return UNKNOWN;
- }
-
- float result = -1;
- Series series = statHistory.get(sender.getSenderId());
- switch (sender.mType) {
- case AVG:
- result = ((SeriesAvg) series).getAvg();
- break;
- case RATE:
- result = ((SeriesRate) series).getRate();
- break;
- }
- return (int) result;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedPolicyAccessor.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedPolicyAccessor.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedPolicyAccessor.java
deleted file mode 100644
index cd7d598..0000000
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedPolicyAccessor.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.feeds;
-
-import java.io.Serializable;
-import java.util.Map;
-
-/**
- * A utility class to access the configuration parameters of a feed ingestion policy.
- */
-public class FeedPolicyAccessor implements Serializable {
-
- private static final long serialVersionUID = 1L;
-
- /** failure configuration **/
- /** continue feed ingestion after a soft (runtime) failure **/
- public static final String SOFT_FAILURE_CONTINUE = "soft.failure.continue";
-
- /** log failed tuple to an asterixdb dataset for future reference **/
- public static final String SOFT_FAILURE_LOG_DATA = "soft.failure.log.data";
-
- /** continue feed ingestion after loss of one or more machines (hardware failure) **/
- public static final String HARDWARE_FAILURE_CONTINUE = "hardware.failure.continue";
-
- /** auto-start a loser feed when the asterixdb instance is restarted **/
- public static final String CLUSTER_REBOOT_AUTO_RESTART = "cluster.reboot.auto.restart";
-
- /** framework provides guarantee that each received feed record will be processed through the ingestion pipeline at least once **/
- public static final String AT_LEAST_ONE_SEMANTICS = "atleast.once.semantics";
-
- /** flow control configuration **/
- /** spill excess tuples to disk if an operator cannot process incoming data at its arrival rate **/
- public static final String SPILL_TO_DISK_ON_CONGESTION = "spill.to.disk.on.congestion";
-
- /** the maximum size of data (tuples) that can be spilled to disk **/
- public static final String MAX_SPILL_SIZE_ON_DISK = "max.spill.size.on.disk";
-
- /** discard tuples altogether if an operator cannot process incoming data at its arrival rate **/
- public static final String DISCARD_ON_CONGESTION = "discard.on.congestion";
-
- /** maximum fraction of ingested data that can be discarded **/
- public static final String MAX_FRACTION_DISCARD = "max.fraction.discard";
-
- /** maximum end-to-end delay/latency in persisting a tuple through the feed ingestion pipeline **/
- public static final String MAX_DELAY_RECORD_PERSISTENCE = "max.delay.record.persistence";
-
- /** rate limit the inflow of tuples in accordance with the maximum capacity of the pipeline **/
- public static final String THROTTLING_ENABLED = "throttling.enabled";
-
- /** elasticity **/
- public static final String ELASTIC = "elastic";
-
- /** statistics **/
- public static final String TIME_TRACKING = "time.tracking";
-
- /** logging of statistics **/
- public static final String LOGGING_STATISTICS = "logging.statistics";
-
- public static final long NO_LIMIT = -1;
-
- private Map<String, String> feedPolicy;
-
- public Map<String, String> getFeedPolicy() {
- return feedPolicy;
- }
-
- public FeedPolicyAccessor(Map<String, String> feedPolicy) {
- this.feedPolicy = feedPolicy;
- }
-
- public void reset(Map<String, String> feedPolicy) {
- this.feedPolicy = feedPolicy;
- }
-
- /** Failure recover/reporting **/
-
- public boolean logDataOnSoftFailure() {
- return getBooleanPropertyValue(SOFT_FAILURE_LOG_DATA, false);
- }
-
- public boolean continueOnSoftFailure() {
- return getBooleanPropertyValue(SOFT_FAILURE_CONTINUE, false);
- }
-
- public boolean continueOnHardwareFailure() {
- return getBooleanPropertyValue(HARDWARE_FAILURE_CONTINUE, false);
- }
-
- public boolean autoRestartOnClusterReboot() {
- return getBooleanPropertyValue(CLUSTER_REBOOT_AUTO_RESTART, false);
- }
-
- public boolean atleastOnceSemantics() {
- return getBooleanPropertyValue(AT_LEAST_ONE_SEMANTICS, false);
- }
-
- /** flow control **/
- public boolean spillToDiskOnCongestion() {
- return getBooleanPropertyValue(SPILL_TO_DISK_ON_CONGESTION, false);
- }
-
- public boolean discardOnCongestion() {
- return getMaxFractionDiscard() > 0;
- }
-
- public boolean throttlingEnabled() {
- return getBooleanPropertyValue(THROTTLING_ENABLED, false);
- }
-
- public long getMaxSpillOnDisk() {
- return getLongPropertyValue(MAX_SPILL_SIZE_ON_DISK, NO_LIMIT);
- }
-
- public float getMaxFractionDiscard() {
- return getFloatPropertyValue(MAX_FRACTION_DISCARD, 0);
- }
-
- public long getMaxDelayRecordPersistence() {
- return getLongPropertyValue(MAX_DELAY_RECORD_PERSISTENCE, Long.MAX_VALUE);
- }
-
- /** Elasticity **/
- public boolean isElastic() {
- return getBooleanPropertyValue(ELASTIC, false);
- }
-
- /** Statistics **/
- public boolean isTimeTrackingEnabled() {
- return getBooleanPropertyValue(TIME_TRACKING, false);
- }
-
- /** Logging of statistics **/
- public boolean isLoggingStatisticsEnabled() {
- return getBooleanPropertyValue(LOGGING_STATISTICS, false);
- }
-
- private boolean getBooleanPropertyValue(String key, boolean defValue) {
- String v = feedPolicy.get(key);
- return v == null ? false : Boolean.valueOf(v);
- }
-
- private long getLongPropertyValue(String key, long defValue) {
- String v = feedPolicy.get(key);
- return v != null ? Long.parseLong(v) : defValue;
- }
-
- private float getFloatPropertyValue(String key, float defValue) {
- String v = feedPolicy.get(key);
- return v != null ? Float.parseFloat(v) : defValue;
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedRuntime.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedRuntime.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedRuntime.java
deleted file mode 100644
index 87276ec..0000000
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedRuntime.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.feeds;
-
-import org.apache.asterix.common.feeds.api.IFeedOperatorOutputSideHandler;
-import org.apache.asterix.common.feeds.api.IFeedRuntime;
-import org.apache.hyracks.api.comm.IFrameWriter;
-
-public class FeedRuntime implements IFeedRuntime {
-
- /** A unique identifier for the runtime **/
- protected final FeedRuntimeId runtimeId;
-
- /** The output frame writer associated with the runtime **/
- protected IFrameWriter frameWriter;
-
- /** The pre-processor associated with the runtime **/
- protected FeedRuntimeInputHandler inputHandler;
-
- public FeedRuntime(FeedRuntimeId runtimeId, FeedRuntimeInputHandler inputHandler, IFrameWriter frameWriter) {
- this.runtimeId = runtimeId;
- this.frameWriter = frameWriter;
- this.inputHandler = inputHandler;
- }
-
- public void setFrameWriter(IFeedOperatorOutputSideHandler frameWriter) {
- this.frameWriter = frameWriter;
- }
-
- @Override
- public FeedRuntimeId getRuntimeId() {
- return runtimeId;
- }
-
- @Override
- public IFrameWriter getFeedFrameWriter() {
- return frameWriter;
- }
-
- @Override
- public String toString() {
- return runtimeId.toString();
- }
-
- @Override
- public FeedRuntimeInputHandler getInputHandler() {
- return inputHandler;
- }
-
- public Mode getMode() {
- return inputHandler != null ? inputHandler.getMode() : Mode.PROCESS;
- }
-
- public void setMode(Mode mode) {
- this.inputHandler.setMode(mode);
- }
-
-}