You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by am...@apache.org on 2016/05/15 19:04:00 UTC
[7/9] incubator-asterixdb git commit: Cleanup Feed CodeBase
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedMetricCollector.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedMetricCollector.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedMetricCollector.java
deleted file mode 100644
index c35587c..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedMetricCollector.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.api;
-
-import org.apache.asterix.external.feed.management.FeedConnectionId;
-import org.apache.asterix.external.feed.runtime.FeedRuntimeId;
-
-public interface IFeedMetricCollector {
-
- public enum ValueType {
- CPU_USAGE,
- INFLOW_RATE,
- OUTFLOW_RATE
- }
-
- public enum MetricType {
- AVG,
- RATE
- }
-
- public boolean sendReport(int senderId, int value);
-
- public int getMetric(int senderId);
-
- public int getMetric(FeedConnectionId connectionId, FeedRuntimeId runtimeId, ValueType valueType);
-
- int createReportSender(FeedConnectionId connectionId, FeedRuntimeId runtimeId, ValueType valueType,
- MetricType metricType);
-
- public void removeReportSender(int senderId);
-
- public void resetReportSender(int senderId);
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedOperatorOutputSideHandler.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedOperatorOutputSideHandler.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedOperatorOutputSideHandler.java
deleted file mode 100644
index a8d0552..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedOperatorOutputSideHandler.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.api;
-
-import org.apache.asterix.external.feed.management.FeedId;
-import org.apache.hyracks.api.comm.IFrameWriter;
-
-/**
- * Provides for output-side buffering for a feed runtime.
- * Right now, we only have a single output side handler
- * and we can probably remove it completely.
- * ______
- * | |
- * ============|core |============
- * ============| op |============
- * |______|^^^^^^^^^^^^
- * Output Side
- * Handler
- *
- **/
-public interface IFeedOperatorOutputSideHandler extends IFrameWriter {
-
- public enum Type {
- BASIC_FEED_OUTPUT_HANDLER,
- DISTRIBUTE_FEED_OUTPUT_HANDLER,
- COLLECT_TRANSFORM_FEED_OUTPUT_HANDLER
- }
-
- public FeedId getFeedId();
-
- public Type getType();
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedProvider.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedProvider.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedProvider.java
deleted file mode 100644
index 9eced07..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedProvider.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.api;
-
-import org.apache.asterix.external.feed.management.FeedId;
-
-public interface IFeedProvider {
-
- public void subscribeFeed(FeedId sourceDeedId);
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedRuntime.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedRuntime.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedRuntime.java
index 269725d..b8375e3 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedRuntime.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedRuntime.java
@@ -18,9 +18,7 @@
*/
package org.apache.asterix.external.feed.api;
-import org.apache.asterix.external.feed.dataflow.FeedRuntimeInputHandler;
import org.apache.asterix.external.feed.runtime.FeedRuntimeId;
-import org.apache.hyracks.api.comm.IFrameWriter;
public interface IFeedRuntime {
@@ -36,27 +34,14 @@ public interface IFeedRuntime {
}
public enum Mode {
- PROCESS,
- SPILL,
- PROCESS_SPILL,
- DISCARD,
- POST_SPILL_DISCARD,
- PROCESS_BACKLOG,
- STALL,
- FAIL,
- END
+ PROCESS, // There is memory
+ SPILL, // Memory budget has been consumed. Now we're writing to disk
+ DISCARD // Memory budget has been consumed. Disk space budget has been consumed. Now we're
+ // discarding
}
/**
* @return the unique runtime id associated with the feedRuntime
*/
public FeedRuntimeId getRuntimeId();
-
- /**
- * @return the frame writer associated with the feed runtime.
- */
- public IFrameWriter getFeedFrameWriter();
-
- public FeedRuntimeInputHandler getInputHandler();
-
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedService.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedService.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedService.java
deleted file mode 100644
index 3d3e0e5..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedService.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.api;
-
-public interface IFeedService {
-
- public void start() throws Exception;
-
- public void stop();
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedSubscriptionManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedSubscriptionManager.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedSubscriptionManager.java
deleted file mode 100644
index ec4c396..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedSubscriptionManager.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.api;
-
-import org.apache.asterix.external.feed.runtime.SubscribableFeedRuntimeId;
-
-public interface IFeedSubscriptionManager {
-
- /**
- * @param subscribableRuntime
- */
- public void registerFeedSubscribableRuntime(ISubscribableRuntime subscribableRuntime);
-
- /**
- * @param subscribableRuntimeId
- */
- public void deregisterFeedSubscribableRuntime(SubscribableFeedRuntimeId subscribableRuntimeId);
-
- /**
- * @param subscribableRuntimeId
- * @return
- */
- public ISubscribableRuntime getSubscribableRuntime(SubscribableFeedRuntimeId subscribableRuntimeId);
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedTrackingManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedTrackingManager.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedTrackingManager.java
deleted file mode 100644
index 6576e09..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedTrackingManager.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.api;
-
-import org.apache.asterix.external.feed.management.FeedConnectionId;
-import org.apache.asterix.external.feed.message.FeedTupleCommitAckMessage;
-
-public interface IFeedTrackingManager {
-
- public void submitAckReport(FeedTupleCommitAckMessage ackMessage);
-
- public void disableAcking(FeedConnectionId connectionId);
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFrameEventCallback.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFrameEventCallback.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFrameEventCallback.java
deleted file mode 100644
index 647d847..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFrameEventCallback.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.api;
-
-public interface IFrameEventCallback {
-
- public enum FrameEvent {
- FINISHED_PROCESSING,
- PENDING_WORK_THRESHOLD_REACHED,
- PENDING_WORK_DONE,
- NO_OP,
- FINISHED_PROCESSING_SPILLAGE
- }
-
- public void frameEvent(FrameEvent frameEvent);
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFramePostProcessor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFramePostProcessor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFramePostProcessor.java
deleted file mode 100644
index eab7a64..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFramePostProcessor.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.api;
-
-import java.nio.ByteBuffer;
-
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-
-public interface IFramePostProcessor {
-
- public void postProcessFrame(ByteBuffer frame, FrameTupleAccessor frameAccessor);
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFramePreprocessor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFramePreprocessor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFramePreprocessor.java
deleted file mode 100644
index 55461b7..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFramePreprocessor.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.api;
-
-import java.nio.ByteBuffer;
-
-public interface IFramePreprocessor {
-
- public void preProcess(ByteBuffer frame) throws Exception;
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IIntakeProgressTracker.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IIntakeProgressTracker.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IIntakeProgressTracker.java
deleted file mode 100644
index 4848ed8..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IIntakeProgressTracker.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.api;
-
-import java.util.Map;
-
-public interface IIntakeProgressTracker {
-
- public void configure(Map<String, String> configuration);
-
- public void notifyIngestedTupleTimestamp(long timestamp);
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IMessageReceiver.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IMessageReceiver.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IMessageReceiver.java
deleted file mode 100644
index e58d99e..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IMessageReceiver.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.api;
-
-public interface IMessageReceiver<T> {
-
- public void sendMessage(T message) throws InterruptedException;
-
- public void close(boolean processPending);
-
- public void start();
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/ISubscribableRuntime.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/ISubscribableRuntime.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/ISubscribableRuntime.java
index ee07188..1ca46ce 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/ISubscribableRuntime.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/ISubscribableRuntime.java
@@ -18,12 +18,8 @@
*/
package org.apache.asterix.external.feed.api;
-import java.util.List;
-
-import org.apache.asterix.external.feed.dataflow.DistributeFeedFrameWriter;
-import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
import org.apache.asterix.external.feed.runtime.CollectionRuntime;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
/**
* Represent a feed runtime whose output can be routed along other parallel path(s).
@@ -34,28 +30,12 @@ public interface ISubscribableRuntime extends IFeedRuntime {
* @param collectionRuntime
* @throws Exception
*/
- public void subscribeFeed(FeedPolicyAccessor fpa, CollectionRuntime collectionRuntime) throws Exception;
+ public void subscribe(CollectionRuntime collectionRuntime) throws HyracksDataException;
/**
* @param collectionRuntime
+ * @throws InterruptedException
* @throws Exception
*/
- public void unsubscribeFeed(CollectionRuntime collectionRuntime) throws Exception;
-
- /**
- * @return
- * @throws Exception
- */
- public List<ISubscriberRuntime> getSubscribers();
-
- /**
- * @return
- */
- public DistributeFeedFrameWriter getFeedFrameWriter();
-
- /**
- * @return
- */
- public RecordDescriptor getRecordDescriptor();
-
+ public void unsubscribe(CollectionRuntime collectionRuntime) throws HyracksDataException, InterruptedException;
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/ISubscriberRuntime.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/ISubscriberRuntime.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/ISubscriberRuntime.java
index 4d3e607..1752054 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/ISubscriberRuntime.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/ISubscriberRuntime.java
@@ -20,11 +20,8 @@ package org.apache.asterix.external.feed.api;
import java.util.Map;
-import org.apache.asterix.external.feed.dataflow.FeedFrameCollector;
-
public interface ISubscriberRuntime {
public Map<String, String> getFeedPolicy();
- public FeedFrameCollector getFrameCollector();
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/ISubscriptionProvider.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/ISubscriptionProvider.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/ISubscriptionProvider.java
deleted file mode 100644
index b94a52e..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/ISubscriptionProvider.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.api;
-
-import org.apache.asterix.external.feed.management.FeedId;
-
-public interface ISubscriptionProvider {
-
- public void subscribeFeed(FeedId sourceFeedId, FeedId recipientFeedId);
-
- public void unsubscribeFeed(FeedId sourceFeedId, FeedId recipientFeedId);
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/CollectTransformFeedFrameWriter.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/CollectTransformFeedFrameWriter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/CollectTransformFeedFrameWriter.java
deleted file mode 100644
index c4d5014..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/CollectTransformFeedFrameWriter.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.dataflow;
-
-import java.nio.ByteBuffer;
-
-import org.apache.asterix.external.feed.api.IFeedOperatorOutputSideHandler;
-import org.apache.asterix.external.feed.api.ISubscribableRuntime;
-import org.apache.asterix.external.feed.management.FeedConnectionId;
-import org.apache.asterix.external.feed.management.FeedId;
-import org.apache.hyracks.api.comm.IFrame;
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.comm.VSizeFrame;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
-
-// Simply a delivery frame writer. I think we can simply get rid of this at some point {TODO}.
-public class CollectTransformFeedFrameWriter implements IFeedOperatorOutputSideHandler {
-
- private final FeedConnectionId connectionId; // [Dataverse - Feed - Dataset]
- private IFrameWriter downstreamWriter; // Writer to next (Operator/Connector)
- private final FrameTupleAccessor inputFrameTupleAccessor; // Accessing input frame tuples
- private final FrameTupleAppender tupleAppender; // Append tuples to output frame
- private final IFrame frame; // Output frame
-
- private ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(1);
-
- public CollectTransformFeedFrameWriter(IHyracksTaskContext ctx, IFrameWriter downstreamWriter,
- ISubscribableRuntime sourceRuntime, RecordDescriptor outputRecordDescriptor, FeedConnectionId connectionId)
- throws HyracksDataException {
- this.connectionId = connectionId;
- this.downstreamWriter = downstreamWriter;
- inputFrameTupleAccessor = new FrameTupleAccessor(sourceRuntime.getRecordDescriptor());
- frame = new VSizeFrame(ctx);
- tupleAppender = new FrameTupleAppender(frame);
- }
-
- @Override
- public void open() throws HyracksDataException {
- downstreamWriter.open();
- }
-
- @Override
- public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
- // always project the first field only. why?
- inputFrameTupleAccessor.reset(buffer);
- int nTuple = inputFrameTupleAccessor.getTupleCount();
- for (int t = 0; t < nTuple; t++) {
- tupleBuilder.addField(inputFrameTupleAccessor, t, 0);
- appendTupleToFrame();
- tupleBuilder.reset();
- }
- }
-
- private void appendTupleToFrame() throws HyracksDataException {
- if (!tupleAppender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
- tupleBuilder.getSize())) {
- FrameUtils.flushFrame(frame.getBuffer(), downstreamWriter);
- tupleAppender.reset(frame, true);
- if (!tupleAppender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
- tupleBuilder.getSize())) {
- throw new IllegalStateException();
- }
- }
- }
-
- @Override
- public void fail() throws HyracksDataException {
- downstreamWriter.fail();
- }
-
- @Override
- public void close() throws HyracksDataException {
- downstreamWriter.close();
- }
-
- @Override
- public FeedId getFeedId() {
- return connectionId.getFeedId();
- }
-
- @Override
- public Type getType() {
- return Type.COLLECT_TRANSFORM_FEED_OUTPUT_HANDLER;
- }
-
- public IFrameWriter getDownstreamWriter() {
- return downstreamWriter;
- }
-
- public FeedConnectionId getConnectionId() {
- return connectionId;
- }
-
- public void reset(IFrameWriter writer) {
- this.downstreamWriter = writer;
- }
-
- @Override
- public void flush() throws HyracksDataException {
- tupleAppender.flush(downstreamWriter);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/DataBucket.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/DataBucket.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/DataBucket.java
deleted file mode 100644
index 7f63d1d..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/DataBucket.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.dataflow;
-
-import java.nio.ByteBuffer;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * A {@DataBucket} is a wrapper around {@ByteBuffer} that expects certain number of receipients
- */
-public class DataBucket {
-
- private static final AtomicInteger globalBucketId = new AtomicInteger(0);
-
- private final ByteBuffer content; // Content
- private final AtomicInteger readCount; // How many reads?
- private final int bucketId; // Id
- private int desiredReadCount; // Number of expected readers
- private ContentType contentType; // Data, End of stream, or End of spilled data
- private final DataBucketPool pool; // Pool of buckets
-
- public enum ContentType {
- DATA, // data (feed tuple)
- EOD, // A signal indicating that there shall be no more data
- EOSD // End of processing of spilled data
- }
-
- public DataBucket(DataBucketPool pool) {
- this.content = ByteBuffer.allocate(pool.getFrameSize());
- this.readCount = new AtomicInteger(0);
- this.pool = pool;
- this.contentType = ContentType.DATA;
- this.bucketId = globalBucketId.incrementAndGet();
- }
-
- public synchronized void reset(ByteBuffer frame) {
- if (frame != null) {
- content.flip();
- System.arraycopy(frame.array(), 0, content.array(), 0, frame.limit());
- content.limit(frame.limit());
- content.position(0);
- }
- }
-
- public synchronized void doneReading() {
- if (readCount.incrementAndGet() == desiredReadCount) {
- readCount.set(0);
- pool.returnDataBucket(this);
- }
- }
-
- public void setDesiredReadCount(int rCount) {
- this.desiredReadCount = rCount;
- }
-
- public ContentType getContentType() {
- return contentType;
- }
-
- public void setContentType(ContentType contentType) {
- this.contentType = contentType;
- }
-
- public synchronized ByteBuffer getContent() {
- return content;
- }
-
- @Override
- public String toString() {
- return "DataBucket [" + bucketId + "]" + " (" + readCount + "," + desiredReadCount + ")";
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/DataBucketPool.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/DataBucketPool.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/DataBucketPool.java
deleted file mode 100644
index d1dea51..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/DataBucketPool.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.dataflow;
-
-import java.util.Stack;
-
-import org.apache.asterix.external.feed.api.IFeedMemoryComponent;
-import org.apache.asterix.external.feed.api.IFeedMemoryManager;
-
-/**
- * Represents a pool of reusable {@link DataBucket}
- */
-public class DataBucketPool implements IFeedMemoryComponent {
-
- /** A unique identifier for the memory component **/
- private final int componentId;
-
- /** The {@link IFeedMemoryManager} for the NodeController **/
- private final IFeedMemoryManager memoryManager;
-
- /** A collection of available data buckets {@link DataBucket} **/
- private final Stack<DataBucket> pool;
-
- /** The total number of data buckets {@link DataBucket} allocated **/
- private int totalAllocation;
-
- /** The fixed frame size as configured for the asterix runtime **/
- private final int frameSize;
-
- public DataBucketPool(int componentId, IFeedMemoryManager memoryManager, int size, int frameSize) {
- this.componentId = componentId;
- this.memoryManager = memoryManager;
- this.pool = new Stack<DataBucket>();
- this.frameSize = frameSize;
- expand(size);
- }
-
- public synchronized void returnDataBucket(DataBucket bucket) {
- pool.push(bucket);
- }
-
- public synchronized DataBucket getDataBucket() {
- if (pool.size() == 0) {
- if (!memoryManager.expandMemoryComponent(this)) {
- return null;
- }
- }
- return pool.pop();
- }
-
- @Override
- public Type getType() {
- return Type.POOL;
- }
-
- @Override
- public int getTotalAllocation() {
- return totalAllocation;
- }
-
- @Override
- public int getComponentId() {
- return componentId;
- }
-
- @Override
- public void expand(int delta) {
- for (int i = 0; i < delta; i++) {
- DataBucket bucket = new DataBucket(this);
- pool.add(bucket);
- }
- totalAllocation += delta;
- }
-
- @Override
- public void reset() {
- totalAllocation -= pool.size();
- pool.clear();
- }
-
- @Override
- public String toString() {
- return "DataBucketPool" + "[" + componentId + "]" + "(" + totalAllocation + ")";
- }
-
- public int getSize() {
- return pool.size();
- }
-
- public int getFrameSize() {
- return frameSize;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/DistributeFeedFrameWriter.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/DistributeFeedFrameWriter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/DistributeFeedFrameWriter.java
index 8b7e2ba..f356899 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/DistributeFeedFrameWriter.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/DistributeFeedFrameWriter.java
@@ -20,17 +20,10 @@ package org.apache.asterix.external.feed.dataflow;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.Map;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-import org.apache.asterix.external.feed.api.IFeedManager;
-import org.apache.asterix.external.feed.api.IFeedOperatorOutputSideHandler;
-import org.apache.asterix.external.feed.api.IFeedOperatorOutputSideHandler.Type;
import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
import org.apache.asterix.external.feed.management.FeedConnectionId;
import org.apache.asterix.external.feed.management.FeedId;
-import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -44,8 +37,6 @@ import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
**/
public class DistributeFeedFrameWriter implements IFrameWriter {
- private static final Logger LOGGER = Logger.getLogger(DistributeFeedFrameWriter.class.getName());
-
/** A unique identifier for the feed to which the incoming tuples belong. **/
private final FeedId feedId;
@@ -56,7 +47,7 @@ public class DistributeFeedFrameWriter implements IFrameWriter {
private final FrameDistributor frameDistributor;
/** The original frame writer instantiated as part of job creation **/
- private IFrameWriter writer;
+ private final IFrameWriter writer;
/** The feed operation whose output is being distributed by the DistributeFeedFrameWriter **/
private final FeedRuntimeType feedRuntimeType;
@@ -65,11 +56,9 @@ public class DistributeFeedFrameWriter implements IFrameWriter {
private final int partition;
public DistributeFeedFrameWriter(IHyracksTaskContext ctx, FeedId feedId, IFrameWriter writer,
- FeedRuntimeType feedRuntimeType, int partition, FrameTupleAccessor fta, IFeedManager feedManager)
- throws IOException {
+ FeedRuntimeType feedRuntimeType, int partition, FrameTupleAccessor fta) throws IOException {
this.feedId = feedId;
- this.frameDistributor = new FrameDistributor(feedId, feedRuntimeType, partition, true,
- feedManager.getFeedMemoryManager(), fta);
+ this.frameDistributor = new FrameDistributor();
this.feedRuntimeType = feedRuntimeType;
this.partition = partition;
this.writer = writer;
@@ -78,38 +67,19 @@ public class DistributeFeedFrameWriter implements IFrameWriter {
/**
* @param fpa
* Feed policy accessor
- * @param frameWriter
+ * @param nextOnlyWriter
* the writer which will deliver the buffers
* @param connectionId
* (Dataverse - Dataset - Feed)
* @return A frame collector.
- * @throws Exception
+ * @throws HyracksDataException
*/
- public FeedFrameCollector subscribeFeed(FeedPolicyAccessor fpa, IFrameWriter frameWriter,
- FeedConnectionId connectionId) throws Exception {
- FeedFrameCollector collector = null;
- if (!frameDistributor.isRegistered(frameWriter)) {
- collector = new FeedFrameCollector(frameDistributor, fpa, frameWriter, connectionId);
- frameDistributor.registerFrameCollector(collector);
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Registered subscriber, new mode " + frameDistributor.getMode());
- }
- return collector;
- } else {
- throw new IllegalStateException("subscriber " + feedId + " already registered");
- }
+ public void subscribe(FeedFrameCollector collector) throws HyracksDataException {
+ frameDistributor.registerFrameCollector(collector);
}
- public void unsubscribeFeed(IFrameWriter recipientFeedFrameWriter) throws Exception {
- boolean success = frameDistributor.deregisterFrameCollector(recipientFeedFrameWriter);
- if (!success) {
- throw new IllegalStateException(
- "Invalid attempt to unregister FeedFrameWriter " + recipientFeedFrameWriter + " not registered.");
- }
- }
-
- public void notifyEndOfFeed() throws InterruptedException {
- frameDistributor.notifyEndOfFeed();
+ public void unsubscribeFeed(FeedConnectionId connectionId) throws HyracksDataException {
+ frameDistributor.deregisterFrameCollector(connectionId);
}
@Override
@@ -136,27 +106,11 @@ public class DistributeFeedFrameWriter implements IFrameWriter {
writer.open();
}
- public Map<IFrameWriter, FeedFrameCollector> getRegisteredReaders() {
- return frameDistributor.getRegisteredReaders();
- }
-
- public void setWriter(IFrameWriter writer) {
- this.writer = writer;
- }
-
- public Type getType() {
- return IFeedOperatorOutputSideHandler.Type.DISTRIBUTE_FEED_OUTPUT_HANDLER;
- }
-
@Override
public String toString() {
return feedId.toString() + feedRuntimeType + "[" + partition + "]";
}
- public FrameDistributor.DistributionMode getDistributionMode() {
- return frameDistributor.getDistributionMode();
- }
-
@Override
public void flush() throws HyracksDataException {
frameDistributor.flush();
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedCollectRuntimeInputHandler.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedCollectRuntimeInputHandler.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedCollectRuntimeInputHandler.java
deleted file mode 100644
index 447c4bc..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedCollectRuntimeInputHandler.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.dataflow;
-
-import java.io.IOException;
-
-import org.apache.asterix.external.feed.api.IFeedManager;
-import org.apache.asterix.external.feed.management.FeedConnectionId;
-import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
-import org.apache.asterix.external.feed.runtime.FeedRuntimeId;
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-
-public class FeedCollectRuntimeInputHandler extends FeedRuntimeInputHandler {
-
- private final FeedFrameCache feedFrameCache;
-
- public FeedCollectRuntimeInputHandler(IHyracksTaskContext ctx, FeedConnectionId connectionId,
- FeedRuntimeId runtimeId, IFrameWriter coreOperator, FeedPolicyAccessor fpa, boolean bufferingEnabled,
- FrameTupleAccessor fta, RecordDescriptor recordDesc, IFeedManager feedManager, int nPartitions)
- throws IOException {
- super(ctx, connectionId, runtimeId, coreOperator, fpa, bufferingEnabled, fta, recordDesc, feedManager,
- nPartitions);
- this.feedFrameCache = new FeedFrameCache(ctx, fta, coreOperator);
- }
-
- public void replayFrom(int recordId) throws HyracksDataException {
- feedFrameCache.replayRecords(recordId);
- }
-
- public void dropTill(int recordId) {
- feedFrameCache.dropTillRecordId(recordId);
- }
-
- public void replayCached() throws HyracksDataException {
- feedFrameCache.replayAll();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedExceptionHandler.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedExceptionHandler.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedExceptionHandler.java
index f102f93..b98f123 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedExceptionHandler.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedExceptionHandler.java
@@ -24,12 +24,9 @@ import java.util.logging.Logger;
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.exceptions.FrameDataException;
-import org.apache.asterix.external.feed.api.IExceptionHandler;
-import org.apache.asterix.external.feed.api.IFeedManager;
-import org.apache.asterix.external.feed.management.FeedConnectionId;
+import org.apache.asterix.common.exceptions.IExceptionHandler;
import org.apache.asterix.external.util.FeedFrameUtil;
import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
@@ -37,12 +34,11 @@ public class FeedExceptionHandler implements IExceptionHandler {
private static Logger LOGGER = Logger.getLogger(FeedExceptionHandler.class.getName());
- //TODO: Enable logging
+ // TODO: Enable logging
private final IHyracksTaskContext ctx;
private final FrameTupleAccessor fta;
- public FeedExceptionHandler(IHyracksTaskContext ctx, FrameTupleAccessor fta, RecordDescriptor recordDesc,
- IFeedManager feedManager, FeedConnectionId connectionId) {
+ public FeedExceptionHandler(IHyracksTaskContext ctx, FrameTupleAccessor fta) {
this.ctx = ctx;
this.fta = fta;
}
@@ -53,20 +49,21 @@ public class FeedExceptionHandler implements IExceptionHandler {
}
@Override
- public ByteBuffer handleException(Exception e, ByteBuffer frame) {
+ public ByteBuffer handle(HyracksDataException th, ByteBuffer frame) {
try {
- if (e instanceof FrameDataException) {
+ if (th instanceof FrameDataException) {
fta.reset(frame);
- FrameDataException fde = (FrameDataException) e;
+ FrameDataException fde = (FrameDataException) th;
int tupleIndex = fde.getTupleIndex();
try {
- logExceptionCausingTuple(tupleIndex, e);
+ logExceptionCausingTuple(tupleIndex, th);
} catch (Exception ex) {
- ex.addSuppressed(e);
+ ex.addSuppressed(th);
if (LOGGER.isLoggable(Level.WARNING)) {
LOGGER.warning("Unable to log exception causing tuple due to..." + ex.getMessage());
}
}
+ // TODO: Improve removeBadTuple. Right now, it creates lots of objects
return FeedFrameUtil.removeBadTuple(ctx, tupleIndex, fta);
} else {
return null;
@@ -80,6 +77,8 @@ public class FeedExceptionHandler implements IExceptionHandler {
}
}
- private void logExceptionCausingTuple(int tupleIndex, Exception e) throws HyracksDataException, AsterixException {
+ // TODO: Fix logging of exceptions
+ private void logExceptionCausingTuple(int tupleIndex, Throwable e) throws HyracksDataException, AsterixException {
+ LOGGER.log(Level.WARNING, e.getMessage(), e);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedFrameCache.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedFrameCache.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedFrameCache.java
deleted file mode 100644
index 159bc43..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedFrameCache.java
+++ /dev/null
@@ -1,177 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.dataflow;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import org.apache.asterix.external.feed.message.MessageReceiver;
-import org.apache.asterix.external.util.FeedConstants.StatisticsConstants;
-import org.apache.hyracks.api.comm.IFrame;
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.comm.VSizeFrame;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-
-/**
- * Allows caching of feed frames. This class is used in providing upstream backup.
- * The tuples at the intake layer are held in this cache until these are acked by
- * the storage layer post their persistence. On receiving an ack, appropriate tuples
- * (recordsId < ackedRecordId) are dropped from the cache.
- */
-public class FeedFrameCache extends MessageReceiver<ByteBuffer> {
-
- /**
- * Value represents a cache feed frame
- * Key represents the largest record Id in the frame.
- * At the intake side, the largest record id corresponds to the last record in the frame
- **/
- private final Map<Integer, ByteBuffer> orderedCache;
- private final FrameTupleAccessor tupleAccessor;
- private final IFrameWriter frameWriter;
- private final IHyracksTaskContext ctx;
-
- public FeedFrameCache(IHyracksTaskContext ctx, FrameTupleAccessor tupleAccessor, IFrameWriter frameWriter) {
- this.tupleAccessor = tupleAccessor;
- this.frameWriter = frameWriter;
- /** A LinkedHashMap ensures entries are retrieved in order of their insertion **/
- this.orderedCache = new LinkedHashMap<Integer, ByteBuffer>();
- this.ctx = ctx;
- }
-
- @Override
- public void processMessage(ByteBuffer frame) throws Exception {
- int lastRecordId = getLastRecordId(frame);
- ByteBuffer clone = cloneFrame(frame);
- orderedCache.put(lastRecordId, clone);
- }
-
- public void dropTillRecordId(int recordId) {
- List<Integer> dropRecordIds = new ArrayList<Integer>();
- for (Entry<Integer, ByteBuffer> entry : orderedCache.entrySet()) {
- int recId = entry.getKey();
- if (recId <= recordId) {
- dropRecordIds.add(recId);
- } else {
- break;
- }
- }
- for (Integer r : dropRecordIds) {
- orderedCache.remove(r);
- }
- }
-
- public void replayRecords(int startingRecordId) throws HyracksDataException {
- boolean replayPositionReached = false;
- for (Entry<Integer, ByteBuffer> entry : orderedCache.entrySet()) {
- // the key increases monotonically
- int maxRecordIdInFrame = entry.getKey();
- if (!replayPositionReached) {
- if (startingRecordId < maxRecordIdInFrame) {
- replayFrame(startingRecordId, entry.getValue());
- break;
- } else {
- continue;
- }
- }
- }
- }
-
- /**
- * Replay the frame from the tuple (inclusive) with recordId as specified.
- *
- * @param recordId
- * @param frame
- * @throws HyracksDataException
- */
- private void replayFrame(int recordId, ByteBuffer frame) throws HyracksDataException {
- tupleAccessor.reset(frame);
- int nTuples = tupleAccessor.getTupleCount();
- for (int i = 0; i < nTuples; i++) {
- int rid = getRecordIdAtTupleIndex(i, frame);
- if (rid == recordId) {
- ByteBuffer slicedFrame = splitFrame(i, frame);
- replayFrame(slicedFrame);
- break;
- }
- }
- }
-
- private ByteBuffer splitFrame(int beginTupleIndex, ByteBuffer frame) throws HyracksDataException {
- IFrame slicedFrame = new VSizeFrame(ctx);
- FrameTupleAppender appender = new FrameTupleAppender();
- appender.reset(slicedFrame, true);
- int totalTuples = tupleAccessor.getTupleCount();
- for (int ti = beginTupleIndex; ti < totalTuples; ti++) {
- appender.append(tupleAccessor, ti);
- }
- return slicedFrame.getBuffer();
- }
-
- /**
- * Replay the frame
- *
- * @param frame
- * @throws HyracksDataException
- */
- private void replayFrame(ByteBuffer frame) throws HyracksDataException {
- frameWriter.nextFrame(frame);
- }
-
- private int getLastRecordId(ByteBuffer frame) {
- tupleAccessor.reset(frame);
- int nTuples = tupleAccessor.getTupleCount();
- return getRecordIdAtTupleIndex(nTuples - 1, frame);
- }
-
- private int getRecordIdAtTupleIndex(int tupleIndex, ByteBuffer frame) {
- tupleAccessor.reset(frame);
- int recordStart = tupleAccessor.getTupleStartOffset(tupleIndex) + tupleAccessor.getFieldSlotsLength();
- int openPartOffset = frame.getInt(recordStart + 6);
- int numOpenFields = frame.getInt(recordStart + openPartOffset);
- int recordIdOffset = frame.getInt(recordStart + openPartOffset + 4 + numOpenFields * 8
- + StatisticsConstants.INTAKE_TUPLEID.length() + 2 + 1);
- int lastRecordId = frame.getInt(recordStart + recordIdOffset);
- return lastRecordId;
- }
-
- private ByteBuffer cloneFrame(ByteBuffer frame) {
- ByteBuffer clone = ByteBuffer.allocate(frame.capacity());
- System.arraycopy(frame.array(), 0, clone.array(), 0, frame.limit());
- return clone;
- }
-
- public void replayAll() throws HyracksDataException {
- for (Entry<Integer, ByteBuffer> entry : orderedCache.entrySet()) {
- ByteBuffer frame = entry.getValue();
- frameWriter.nextFrame(frame);
- }
- }
-
- @Override
- public void emptyInbox() throws HyracksDataException {
- frameWriter.flush();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedFrameCollector.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedFrameCollector.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedFrameCollector.java
index ef4b87d..b81f59a 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedFrameCollector.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedFrameCollector.java
@@ -19,21 +19,16 @@
package org.apache.asterix.external.feed.dataflow;
import java.nio.ByteBuffer;
-import java.util.logging.Level;
-import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.external.feed.management.FeedConnectionId;
-import org.apache.asterix.external.feed.message.MessageReceiver;
import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.exceptions.HyracksDataException;
-public class FeedFrameCollector extends MessageReceiver<DataBucket> {
+public class FeedFrameCollector implements IFrameWriter {
private final FeedConnectionId connectionId;
- private final FrameDistributor frameDistributor;
- private FeedPolicyAccessor fpa;
- private IFrameWriter frameWriter;
+ private IFrameWriter writer;
private State state;
public enum State {
@@ -43,70 +38,27 @@ public class FeedFrameCollector extends MessageReceiver<DataBucket> {
HANDOVER
}
- public FeedFrameCollector(FrameDistributor frameDistributor, FeedPolicyAccessor feedPolicyAccessor,
- IFrameWriter frameWriter, FeedConnectionId connectionId) {
- super();
- this.frameDistributor = frameDistributor;
- this.fpa = feedPolicyAccessor;
+ public FeedFrameCollector(FeedPolicyAccessor feedPolicyAccessor, IFrameWriter writer,
+ FeedConnectionId connectionId) {
this.connectionId = connectionId;
- this.frameWriter = frameWriter;
+ this.writer = writer;
this.state = State.ACTIVE;
}
@Override
- public void processMessage(DataBucket bucket) throws Exception {
- try {
- ByteBuffer frame = bucket.getContent();
- switch (bucket.getContentType()) {
- case DATA:
- frameWriter.nextFrame(frame);
- break;
- case EOD:
- closeCollector();
- break;
- case EOSD:
- throw new AsterixException("Received data bucket with content of type " + bucket.getContentType());
- }
- } catch (Exception e) {
- e.printStackTrace();
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning("Unable to process data bucket " + bucket + ", encountered exception " + e.getMessage());
- }
- } finally {
- bucket.doneReading();
- }
- }
-
- public void closeCollector() {
- if (state.equals(State.TRANSITION)) {
- super.close(true);
- setState(State.ACTIVE);
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info(this + " is now " + State.ACTIVE + " mode, processing frames synchronously");
- }
- } else {
- flushPendingMessages();
- setState(State.FINISHED);
- synchronized (frameDistributor.getRegisteredCollectors()) {
- frameDistributor.getRegisteredCollectors().notifyAll();
- }
- disconnect();
- }
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Closed collector " + this);
- }
+ public synchronized void close() throws HyracksDataException {
+ writer.close();
+ state = State.FINISHED;
+ notify();
}
public synchronized void disconnect() {
setState(State.FINISHED);
}
+ @Override
public synchronized void nextFrame(ByteBuffer frame) throws HyracksDataException {
- frameWriter.nextFrame(frame);
- }
-
- public FeedPolicyAccessor getFeedPolicyAccessor() {
- return fpa;
+ writer.nextFrame(frame);
}
public synchronized State getState() {
@@ -123,17 +75,14 @@ public class FeedFrameCollector extends MessageReceiver<DataBucket> {
default:
break;
}
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Frame Collector " + this.frameDistributor.getFeedRuntimeType() + " switched to " + state);
- }
}
public IFrameWriter getFrameWriter() {
- return frameWriter;
+ return writer;
}
- public void setFrameWriter(IFrameWriter frameWriter) {
- this.frameWriter = frameWriter;
+ public void setFrameWriter(IFrameWriter writer) {
+ this.writer = writer;
}
@Override
@@ -158,12 +107,21 @@ public class FeedFrameCollector extends MessageReceiver<DataBucket> {
}
@Override
- public void emptyInbox() throws HyracksDataException {
- flush();
+ public synchronized void flush() throws HyracksDataException {
+ writer.flush();
}
- public synchronized void flush() throws HyracksDataException {
- frameWriter.flush();
+ @Override
+ public void open() throws HyracksDataException {
+ writer.open();
}
+ @Override
+ public void fail() throws HyracksDataException {
+ writer.fail();
+ }
+
+ public FeedConnectionId getConnectionId() {
+ return connectionId;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedFrameDiscarder.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedFrameDiscarder.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedFrameDiscarder.java
deleted file mode 100644
index 53ee475..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedFrameDiscarder.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.dataflow;
-
-import java.nio.ByteBuffer;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.external.feed.management.FeedConnectionId;
-import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
-import org.apache.asterix.external.feed.runtime.FeedRuntimeId;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-public class FeedFrameDiscarder {
-
- private static final Logger LOGGER = Logger.getLogger(FeedFrameSpiller.class.getName());
-
- private final FeedRuntimeInputHandler inputHandler;
- private final FeedConnectionId connectionId;
- private final FeedRuntimeId runtimeId;
- private final FeedPolicyAccessor policyAccessor;
- private final float maxFractionDiscard;
- private int nDiscarded;
-
- public FeedFrameDiscarder(FeedConnectionId connectionId, FeedRuntimeId runtimeId, FeedPolicyAccessor policyAccessor,
- FeedRuntimeInputHandler inputHandler) throws HyracksDataException {
- this.connectionId = connectionId;
- this.runtimeId = runtimeId;
- this.policyAccessor = policyAccessor;
- this.inputHandler = inputHandler;
- this.maxFractionDiscard = policyAccessor.getMaxFractionDiscard();
- }
-
- public boolean processMessage(ByteBuffer message) {
- if (policyAccessor.getMaxFractionDiscard() != 0) {
- long nProcessed = inputHandler.getProcessed();
- long discardLimit = (long) (nProcessed * maxFractionDiscard);
- if (nDiscarded >= discardLimit) {
- return false;
- }
- nDiscarded++;
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning("Discarded frame by " + connectionId + " (" + runtimeId + ")" + " count so far ("
- + nDiscarded + ") Limit [" + discardLimit + "]");
- }
- return true;
- }
- return false;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedFrameHandlers.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedFrameHandlers.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedFrameHandlers.java
deleted file mode 100644
index 2a6fd79..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedFrameHandlers.java
+++ /dev/null
@@ -1,309 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.dataflow;
-
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Collection;
-import java.util.Date;
-import java.util.Iterator;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.external.feed.api.IFeedFrameHandler;
-import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
-import org.apache.asterix.external.feed.management.FeedId;
-import org.apache.asterix.external.feed.message.MessageReceiver;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-
-public class FeedFrameHandlers {
-
- private static final Logger LOGGER = Logger.getLogger(FeedFrameHandlers.class.getName());
-
- public enum RoutingMode {
- IN_MEMORY_ROUTE,
- SPILL_TO_DISK,
- DISCARD
- }
-
- public static IFeedFrameHandler getFeedFrameHandler(FrameDistributor distributor, FeedId feedId,
- RoutingMode routingMode, FeedRuntimeType runtimeType, int partition, int frameSize) throws IOException {
- IFeedFrameHandler handler = null;
- switch (routingMode) {
- case IN_MEMORY_ROUTE:
- handler = new InMemoryRouter(distributor.getRegisteredReaders().values(), runtimeType, partition);
- break;
- case SPILL_TO_DISK:
- handler = new DiskSpiller(distributor, feedId, runtimeType, partition, frameSize);
- break;
- case DISCARD:
- handler = new DiscardRouter(distributor, feedId, runtimeType, partition);
- break;
- default:
- throw new IllegalArgumentException("Invalid routing mode" + routingMode);
- }
- return handler;
- }
-
- public static class DiscardRouter implements IFeedFrameHandler {
-
- private final FeedId feedId;
- private int nDiscarded;
- private final FeedRuntimeType runtimeType;
- private final int partition;
- private final FrameDistributor distributor;
-
- public DiscardRouter(FrameDistributor distributor, FeedId feedId, FeedRuntimeType runtimeType, int partition)
- throws HyracksDataException {
- this.distributor = distributor;
- this.feedId = feedId;
- this.nDiscarded = 0;
- this.runtimeType = runtimeType;
- this.partition = partition;
- }
-
- @Override
- public void handleFrame(ByteBuffer frame) throws HyracksDataException {
- FrameTupleAccessor fta = distributor.getFta();
- fta.reset(frame);
- int nTuples = fta.getTupleCount();
- nDiscarded += nTuples;
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning("Discarded additional [" + runtimeType + "]" + "(" + partition + ")" + " " + nTuples);
- }
- }
-
- @Override
- public void handleDataBucket(DataBucket bucket) {
- nDiscarded++;
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning("Discard Count" + nDiscarded);
- }
- }
-
- @Override
- public void close() {
- // do nothing, no resource to relinquish
- }
-
- @Override
- public Iterator<ByteBuffer> replayData() throws HyracksDataException {
- throw new IllegalStateException("Invalid operation");
- }
-
- @Override
- public String toString() {
- return "DiscardRouter" + "[" + feedId + "]" + "(" + nDiscarded + ")";
- }
-
- @Override
- public String getSummary() {
- return new String("Number of discarded frames (since last reset)" + " feedId " + "[" + feedId + "]" + "("
- + nDiscarded + ")");
- }
-
- }
-
- public static class InMemoryRouter implements IFeedFrameHandler {
-
- private final Collection<FeedFrameCollector> frameCollectors;
-
- public InMemoryRouter(Collection<FeedFrameCollector> frameCollectors, FeedRuntimeType runtimeType,
- int partition) {
- this.frameCollectors = frameCollectors;
- }
-
- @Override
- public void handleFrame(ByteBuffer frame) throws HyracksDataException {
- throw new IllegalStateException("Operation not supported");
- }
-
- @Override
- public void handleDataBucket(DataBucket bucket) throws InterruptedException {
- for (FeedFrameCollector collector : frameCollectors) {
- collector.sendMessage(bucket);
- }
- }
-
- @Override
- public void close() {
- // do nothing
- }
-
- @Override
- public Iterator<ByteBuffer> replayData() throws HyracksDataException {
- throw new IllegalStateException("Operation not supported");
- }
-
- @Override
- public String getSummary() {
- return "InMemoryRouter Summary";
- }
- }
-
- public static class DiskSpiller implements IFeedFrameHandler {
-
- private FrameSpiller<ByteBuffer> receiver;
- private Iterator<ByteBuffer> iterator;
-
- public DiskSpiller(FrameDistributor distributor, FeedId feedId, FeedRuntimeType runtimeType, int partition,
- int frameSize) throws IOException {
- receiver = new FrameSpiller<ByteBuffer>(distributor, feedId, frameSize);
- }
-
- @Override
- public void handleFrame(ByteBuffer frame) throws HyracksDataException, InterruptedException {
- receiver.sendMessage(frame);
- }
-
- private static class FrameSpiller<T> extends MessageReceiver<ByteBuffer> {
-
- private final FeedId feedId;
- private BufferedOutputStream bos;
- private final ByteBuffer reusableLengthBuffer;
- private final ByteBuffer reusableDataBuffer;
- private long offset;
- private File file;
- private final FrameDistributor frameDistributor;
- private boolean fileCreated = false;
-
- public FrameSpiller(FrameDistributor distributor, FeedId feedId, int frameSize) throws IOException {
- this.feedId = feedId;
- this.frameDistributor = distributor;
- reusableLengthBuffer = ByteBuffer.allocate(4);
- reusableDataBuffer = ByteBuffer.allocate(frameSize);
- this.offset = 0;
- }
-
- @Override
- public void processMessage(ByteBuffer message) throws Exception {
- if (!fileCreated) {
- createFile();
- fileCreated = true;
- }
- reusableLengthBuffer.flip();
- reusableLengthBuffer.putInt(message.array().length);
- bos.write(reusableLengthBuffer.array());
- bos.write(message.array());
- }
-
- private void createFile() throws IOException {
- Date date = new Date();
- String dateSuffix = date.toString().replace(' ', '_');
- String fileName = feedId.toString() + "_" + frameDistributor.getFeedRuntimeType() + "_"
- + frameDistributor.getPartition() + "_" + dateSuffix;
-
- file = new File(fileName);
- if (!file.exists()) {
- boolean success = file.createNewFile();
- if (!success) {
- throw new IOException("Unable to create spill file for feed " + feedId);
- }
- }
- bos = new BufferedOutputStream(new FileOutputStream(file));
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Created Spill File for feed " + feedId);
- }
- }
-
- @SuppressWarnings("resource")
- public Iterator<ByteBuffer> replayData() throws Exception {
- final BufferedInputStream bis = new BufferedInputStream(new FileInputStream(file));
- bis.skip(offset);
- return new Iterator<ByteBuffer>() {
-
- @Override
- public boolean hasNext() {
- boolean more = false;
- try {
- more = bis.available() > 0;
- if (!more) {
- bis.close();
- }
- } catch (IOException e) {
- e.printStackTrace();
- }
-
- return more;
- }
-
- @Override
- public ByteBuffer next() {
- reusableLengthBuffer.flip();
- try {
- bis.read(reusableLengthBuffer.array());
- reusableLengthBuffer.flip();
- int frameSize = reusableLengthBuffer.getInt();
- reusableDataBuffer.flip();
- bis.read(reusableDataBuffer.array(), 0, frameSize);
- offset += 4 + frameSize;
- } catch (IOException e) {
- e.printStackTrace();
- }
- return reusableDataBuffer;
- }
-
- @Override
- public void remove() {
- }
-
- };
- }
-
- @Override
- public void emptyInbox() throws HyracksDataException {
- }
-
- }
-
- @Override
- public void handleDataBucket(DataBucket bucket) {
- throw new IllegalStateException("Operation not supported");
- }
-
- @Override
- public void close() {
- receiver.close(true);
- }
-
- @Override
- public Iterator<ByteBuffer> replayData() throws HyracksDataException {
- try {
- iterator = receiver.replayData();
- } catch (Exception e) {
- throw new HyracksDataException(e);
- }
- return iterator;
- }
-
- //TODO: Form a summary that includes stats related to what has been spilled to disk
- @Override
- public String getSummary() {
- return "Disk Spiller Summary";
- }
-
- }
-
-}