You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by am...@apache.org on 2016/03/26 21:27:39 UTC
[08/13] incubator-asterixdb git commit: Improve Error Handling in
Local Directory Feeds
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/121e1d9a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketServerInputStreamProviderFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketServerInputStreamProviderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketServerInputStreamProviderFactory.java
deleted file mode 100644
index a301c1a..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketServerInputStreamProviderFactory.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.input.stream.factory;
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.ServerSocket;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.Set;
-
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.external.api.IInputStreamProvider;
-import org.apache.asterix.external.api.IInputStreamProviderFactory;
-import org.apache.asterix.external.input.stream.provider.SocketServerInputStreamProvider;
-import org.apache.asterix.external.util.ExternalDataConstants;
-import org.apache.asterix.om.util.AsterixRuntimeUtil;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
-import org.apache.hyracks.algebricks.common.utils.Pair;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-public class SocketServerInputStreamProviderFactory implements IInputStreamProviderFactory {
-
- private static final long serialVersionUID = 1L;
- private List<Pair<String, Integer>> sockets;
- private Mode mode = Mode.IP;
-
- public static enum Mode {
- NC,
- IP
- }
-
- @Override
- public void configure(Map<String, String> configuration) throws AsterixException {
- try {
- sockets = new ArrayList<Pair<String, Integer>>();
- String modeValue = configuration.get(ExternalDataConstants.KEY_MODE);
- if (modeValue != null) {
- mode = Mode.valueOf(modeValue.trim().toUpperCase());
- }
- String socketsValue = configuration.get(ExternalDataConstants.KEY_SOCKETS);
- if (socketsValue == null) {
- throw new IllegalArgumentException(
- "\'sockets\' parameter not specified as part of adapter configuration");
- }
- Map<InetAddress, Set<String>> ncMap;
- ncMap = AsterixRuntimeUtil.getNodeControllerMap();
- List<String> ncs = AsterixRuntimeUtil.getAllNodeControllers();
- String[] socketsArray = socketsValue.split(",");
- Random random = new Random();
- for (String socket : socketsArray) {
- String[] socketTokens = socket.split(":");
- String host = socketTokens[0].trim();
- int port = Integer.parseInt(socketTokens[1].trim());
- Pair<String, Integer> p = null;
- switch (mode) {
- case IP:
- Set<String> ncsOnIp = ncMap.get(InetAddress.getByName(host));
- if ((ncsOnIp == null) || ncsOnIp.isEmpty()) {
- throw new IllegalArgumentException("Invalid host " + host
- + " as it is not part of the AsterixDB cluster. Valid choices are "
- + StringUtils.join(ncMap.keySet(), ", "));
- }
- String[] ncArray = ncsOnIp.toArray(new String[] {});
- String nc = ncArray[random.nextInt(ncArray.length)];
- p = new Pair<String, Integer>(nc, port);
- break;
-
- case NC:
- p = new Pair<String, Integer>(host, port);
- if (!ncs.contains(host)) {
- throw new IllegalArgumentException("Invalid NC " + host
- + " as it is not part of the AsterixDB cluster. Valid choices are "
- + StringUtils.join(ncs, ", "));
-
- }
- break;
- }
- sockets.add(p);
- }
- } catch (Exception e) {
- throw new AsterixException(e);
- }
- }
-
- @Override
- public synchronized IInputStreamProvider createInputStreamProvider(IHyracksTaskContext ctx, int partition)
- throws HyracksDataException {
- try {
- Pair<String, Integer> socket = sockets.get(partition);
- ServerSocket server;
- server = new ServerSocket(socket.second);
- return new SocketServerInputStreamProvider(server);
- } catch (IOException e) {
- throw new HyracksDataException(e);
- }
- }
-
- @Override
- public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() {
- List<String> locations = new ArrayList<String>();
- for (Pair<String, Integer> socket : sockets) {
- locations.add(socket.first);
- }
- return new AlgebricksAbsolutePartitionConstraint(locations.toArray(new String[] {}));
- }
-
- public List<Pair<String, Integer>> getSockets() {
- return sockets;
- }
-
- @Override
- public DataSourceType getDataSourceType() {
- return DataSourceType.STREAM;
- }
-
- @Override
- public boolean isIndexible() {
- return false;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/121e1d9a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamFactory.java
new file mode 100644
index 0000000..8052822
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamFactory.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.stream.factory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.asterix.external.api.AsterixInputStream;
+import org.apache.asterix.external.api.IInputStreamFactory;
+import org.apache.asterix.external.input.stream.TwitterFirehoseInputStream;
+import org.apache.asterix.om.util.AsterixClusterProperties;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * Factory class for creating @see{TwitterFirehoseFeedAdapter}. The adapter
+ * simulates a twitter firehose with tweets being "pushed" into Asterix at a
+ * configurable rate measured in terms of TPS (tweets/second). The stream of
+ * tweets lasts for a configurable duration (measured in seconds).
+ */
+public class TwitterFirehoseStreamFactory implements IInputStreamFactory {
+
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * Degree of parallelism for feed ingestion activity. Defaults to 1. This
+ * determines the count constraint for the ingestion operator.
+ **/
+ private static final String KEY_INGESTION_CARDINALITY = "ingestion-cardinality";
+
+ /**
+ * The absolute locations where ingestion operator instances will be placed.
+ **/
+ private static final String KEY_INGESTION_LOCATIONS = "ingestion-location";
+
+ private Map<String, String> configuration;
+
+ @Override
+ public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() {
+ String ingestionCardinalityParam = configuration.get(KEY_INGESTION_CARDINALITY);
+ String ingestionLocationParam = configuration.get(KEY_INGESTION_LOCATIONS);
+ String[] locations = null;
+ if (ingestionLocationParam != null) {
+ locations = ingestionLocationParam.split(",");
+ }
+ int count = locations != null ? locations.length : 1;
+ if (ingestionCardinalityParam != null) {
+ count = Integer.parseInt(ingestionCardinalityParam);
+ }
+
+ List<String> chosenLocations = new ArrayList<String>();
+ String[] availableLocations = locations != null ? locations
+ : AsterixClusterProperties.INSTANCE.getParticipantNodes().toArray(new String[] {});
+ for (int i = 0, k = 0; i < count; i++, k = (k + 1) % availableLocations.length) {
+ chosenLocations.add(availableLocations[k]);
+ }
+ return new AlgebricksAbsolutePartitionConstraint(chosenLocations.toArray(new String[] {}));
+ }
+
+ @Override
+ public DataSourceType getDataSourceType() {
+ return DataSourceType.STREAM;
+ }
+
+ @Override
+ public void configure(Map<String, String> configuration) {
+ this.configuration = configuration;
+ }
+
+ @Override
+ public boolean isIndexible() {
+ return false;
+ }
+
+ @Override
+ public AsterixInputStream createInputStream(IHyracksTaskContext ctx, int partition) throws HyracksDataException {
+ try {
+ return new TwitterFirehoseInputStream(configuration, ctx, partition);
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/121e1d9a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamProviderFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamProviderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamProviderFactory.java
deleted file mode 100644
index 7b09ade..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamProviderFactory.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.input.stream.factory;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.asterix.external.api.IInputStreamProvider;
-import org.apache.asterix.external.api.IInputStreamProviderFactory;
-import org.apache.asterix.external.input.stream.provider.TwitterFirehoseInputStreamProvider;
-import org.apache.asterix.om.util.AsterixClusterProperties;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-/**
- * Factory class for creating @see{TwitterFirehoseFeedAdapter}. The adapter
- * simulates a twitter firehose with tweets being "pushed" into Asterix at a
- * configurable rate measured in terms of TPS (tweets/second). The stream of
- * tweets lasts for a configurable duration (measured in seconds).
- */
-public class TwitterFirehoseStreamProviderFactory implements IInputStreamProviderFactory {
-
- private static final long serialVersionUID = 1L;
-
- /**
- * Degree of parallelism for feed ingestion activity. Defaults to 1. This
- * determines the count constraint for the ingestion operator.
- **/
- private static final String KEY_INGESTION_CARDINALITY = "ingestion-cardinality";
-
- /**
- * The absolute locations where ingestion operator instances will be placed.
- **/
- private static final String KEY_INGESTION_LOCATIONS = "ingestion-location";
-
- private Map<String, String> configuration;
-
- @Override
- public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() {
- String ingestionCardinalityParam = configuration.get(KEY_INGESTION_CARDINALITY);
- String ingestionLocationParam = configuration.get(KEY_INGESTION_LOCATIONS);
- String[] locations = null;
- if (ingestionLocationParam != null) {
- locations = ingestionLocationParam.split(",");
- }
- int count = locations != null ? locations.length : 1;
- if (ingestionCardinalityParam != null) {
- count = Integer.parseInt(ingestionCardinalityParam);
- }
-
- List<String> chosenLocations = new ArrayList<String>();
- String[] availableLocations = locations != null ? locations
- : AsterixClusterProperties.INSTANCE.getParticipantNodes().toArray(new String[] {});
- for (int i = 0, k = 0; i < count; i++, k = (k + 1) % availableLocations.length) {
- chosenLocations.add(availableLocations[k]);
- }
- return new AlgebricksAbsolutePartitionConstraint(chosenLocations.toArray(new String[] {}));
- }
-
- @Override
- public DataSourceType getDataSourceType() {
- return DataSourceType.STREAM;
- }
-
- @Override
- public void configure(Map<String, String> configuration) {
- this.configuration = configuration;
- }
-
- @Override
- public boolean isIndexible() {
- return false;
- }
-
- @Override
- public IInputStreamProvider createInputStreamProvider(IHyracksTaskContext ctx, int partition)
- throws HyracksDataException {
- return new TwitterFirehoseInputStreamProvider(configuration, ctx, partition);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/121e1d9a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/HDFSInputStreamProvider.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/HDFSInputStreamProvider.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/HDFSInputStreamProvider.java
deleted file mode 100644
index e1ab331..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/HDFSInputStreamProvider.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.input.stream.provider;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.asterix.external.api.IInputStreamProvider;
-import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
-import org.apache.asterix.external.indexing.ExternalFile;
-import org.apache.asterix.external.input.record.reader.hdfs.HDFSRecordReader;
-import org.apache.asterix.external.input.stream.AInputStream;
-import org.apache.asterix.external.provider.ExternalIndexerProvider;
-import org.apache.asterix.external.util.ExternalDataConstants;
-import org.apache.asterix.external.util.FeedLogManager;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-
-public class HDFSInputStreamProvider<K> extends HDFSRecordReader<K, Text> implements IInputStreamProvider {
-
- public HDFSInputStreamProvider(boolean read[], InputSplit[] inputSplits, String[] readSchedule, String nodeName,
- JobConf conf, Map<String, String> configuration, List<ExternalFile> snapshot) throws Exception {
- super(read, inputSplits, readSchedule, nodeName, conf, snapshot,
- snapshot == null ? null : ExternalIndexerProvider.getIndexer(configuration));
- value = new Text();
- if (snapshot != null) {
- if (currentSplitIndex < snapshot.size()) {
- indexer.reset(this);
- }
- }
- }
-
- @Override
- public AInputStream getInputStream() {
- return new HDFSInputStream();
- }
-
- private class HDFSInputStream extends AInputStream {
- int pos = 0;
-
- @Override
- public int read() throws IOException {
- if (value.getLength() < pos) {
- if (!readMore()) {
- return -1;
- }
- } else if (value.getLength() == pos) {
- pos++;
- return ExternalDataConstants.BYTE_LF;
- }
- return value.getBytes()[pos++];
- }
-
- private int readRecord(byte[] buffer, int offset, int len) {
- int actualLength = value.getLength() + 1;
- if ((actualLength - pos) > len) {
- //copy partial record
- System.arraycopy(value.getBytes(), pos, buffer, offset, len);
- pos += len;
- return len;
- } else {
- int numBytes = value.getLength() - pos;
- System.arraycopy(value.getBytes(), pos, buffer, offset, numBytes);
- buffer[offset + numBytes] = ExternalDataConstants.LF;
- pos += numBytes;
- numBytes++;
- return numBytes;
- }
- }
-
- @Override
- public int read(byte[] buffer, int offset, int len) throws IOException {
- if (value.getLength() > pos) {
- return readRecord(buffer, offset, len);
- }
- if (!readMore()) {
- return -1;
- }
- return readRecord(buffer, offset, len);
- }
-
- private boolean readMore() throws IOException {
- try {
- pos = 0;
- return HDFSInputStreamProvider.this.hasNext();
- } catch (Exception e) {
- throw new IOException(e);
- }
- }
-
- @Override
- public boolean skipError() throws Exception {
- return true;
- }
-
- @Override
- public boolean stop() throws Exception {
- return false;
- }
-
- @Override
- public void setFeedLogManager(FeedLogManager logManager) {
- }
-
- @Override
- public void setController(AbstractFeedDataFlowController controller) {
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/121e1d9a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/LocalFSInputStreamProvider.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/LocalFSInputStreamProvider.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/LocalFSInputStreamProvider.java
deleted file mode 100644
index fbe6035..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/LocalFSInputStreamProvider.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.input.stream.provider;
-
-import java.nio.file.Path;
-import java.util.Map;
-
-import org.apache.asterix.external.api.IInputStreamProvider;
-import org.apache.asterix.external.input.stream.AInputStream;
-import org.apache.asterix.external.input.stream.LocalFileSystemInputStream;
-import org.apache.asterix.external.util.FeedLogManager;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.std.file.FileSplit;
-
-public class LocalFSInputStreamProvider implements IInputStreamProvider {
-
- private final String expression;
- private final boolean isFeed;
- private final Path path;
- private FeedLogManager feedLogManager;
-
- public LocalFSInputStreamProvider(final FileSplit[] fileSplits, final IHyracksTaskContext ctx,
- final Map<String, String> configuration, final int partition, final String expression,
- final boolean isFeed) {
- this.expression = expression;
- this.isFeed = isFeed;
- this.path = fileSplits[partition].getLocalFile().getFile().toPath();
- }
-
- @Override
- public AInputStream getInputStream() throws HyracksDataException {
- final LocalFileSystemInputStream stream = new LocalFileSystemInputStream(path, expression, isFeed);
- stream.setFeedLogManager(feedLogManager);
- return stream;
- }
-
- @Override
- public void setFeedLogManager(final FeedLogManager feedLogManager) {
- this.feedLogManager = feedLogManager;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/121e1d9a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/SocketClientInputStreamProvider.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/SocketClientInputStreamProvider.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/SocketClientInputStreamProvider.java
deleted file mode 100644
index f842638..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/SocketClientInputStreamProvider.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.input.stream.provider;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.Socket;
-
-import org.apache.asterix.external.api.IInputStreamProvider;
-import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
-import org.apache.asterix.external.input.stream.AInputStream;
-import org.apache.asterix.external.util.FeedLogManager;
-import org.apache.hyracks.algebricks.common.utils.Pair;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.log4j.Logger;
-
-public class SocketClientInputStreamProvider implements IInputStreamProvider {
-
- private static final Logger LOGGER = Logger.getLogger(SocketClientInputStreamProvider.class.getName());
- private final Socket socket;
-
- public SocketClientInputStreamProvider(Pair<String, Integer> ipAndPort) throws HyracksDataException {
- try {
- socket = new Socket(ipAndPort.first, ipAndPort.second);
- } catch (IOException e) {
- LOGGER.error(
- "Problem in creating socket against host " + ipAndPort.first + " on the port " + ipAndPort.second,
- e);
- throw new HyracksDataException(e);
- }
- }
-
- @Override
- public AInputStream getInputStream() throws HyracksDataException {
- InputStream in;
- try {
- in = socket.getInputStream();
- } catch (IOException e) {
- throw new HyracksDataException(e);
- }
- return new AInputStream() {
- @Override
- public int read() throws IOException {
- throw new IOException("method not supported. use read(byte[] buffer, int offset, int length) instead");
- }
-
- @Override
- public int read(byte[] buffer, int offset, int length) throws IOException {
- return in.read(buffer, offset, length);
- }
-
- @Override
- public boolean stop() throws Exception {
- if (!socket.isClosed()) {
- try {
- in.close();
- } finally {
- socket.close();
- }
- }
- return true;
- }
-
- @Override
- public boolean skipError() throws Exception {
- return false;
- }
-
- @Override
- public void setFeedLogManager(FeedLogManager logManager) {
- }
-
- @Override
- public void setController(AbstractFeedDataFlowController controller) {
- }
- };
- }
-
- @Override
- public void setFeedLogManager(FeedLogManager feedLogManager) {
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/121e1d9a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/SocketServerInputStreamProvider.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/SocketServerInputStreamProvider.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/SocketServerInputStreamProvider.java
deleted file mode 100644
index 64f0342..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/SocketServerInputStreamProvider.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.input.stream.provider;
-
-import java.net.ServerSocket;
-
-import org.apache.asterix.external.api.IInputStreamProvider;
-import org.apache.asterix.external.input.stream.AInputStream;
-import org.apache.asterix.external.input.stream.SocketServerInputStream;
-import org.apache.asterix.external.util.FeedLogManager;
-
-public class SocketServerInputStreamProvider implements IInputStreamProvider {
- private final ServerSocket server;
-
- public SocketServerInputStreamProvider(ServerSocket server) {
- this.server = server;
- }
-
- @Override
- public AInputStream getInputStream() {
- return new SocketServerInputStream(server);
- }
-
- @Override
- public void setFeedLogManager(FeedLogManager feedLogManager) {
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/121e1d9a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/TwitterFirehoseInputStreamProvider.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/TwitterFirehoseInputStreamProvider.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/TwitterFirehoseInputStreamProvider.java
deleted file mode 100644
index a979262..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/TwitterFirehoseInputStreamProvider.java
+++ /dev/null
@@ -1,202 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.input.stream.provider;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.PipedInputStream;
-import java.io.PipedOutputStream;
-import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.external.api.IInputStreamProvider;
-import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
-import org.apache.asterix.external.input.stream.AInputStream;
-import org.apache.asterix.external.util.FeedLogManager;
-import org.apache.asterix.external.util.TweetGenerator;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-public class TwitterFirehoseInputStreamProvider implements IInputStreamProvider {
-
- private static final Logger LOGGER = Logger.getLogger(TwitterFirehoseInputStreamProvider.class.getName());
-
- private final ExecutorService executorService;
-
- private final PipedOutputStream outputStream;
-
- private final PipedInputStream inputStream;
-
- private final TwitterServer twitterServer;
-
- public TwitterFirehoseInputStreamProvider(Map<String, String> configuration, IHyracksTaskContext ctx, int partition)
- throws HyracksDataException {
- try {
- executorService = Executors.newCachedThreadPool();
- outputStream = new PipedOutputStream();
- inputStream = new PipedInputStream(outputStream);
- twitterServer = new TwitterServer(configuration, partition, outputStream, executorService, inputStream);
- } catch (IOException e) {
- throw new HyracksDataException(e);
- }
- }
-
- @Override
- public AInputStream getInputStream() {
- return twitterServer;
- }
-
- private static class TwitterServer extends AInputStream {
- private final DataProvider dataProvider;
- private final ExecutorService executorService;
- private final InputStream in;
- private boolean started;
-
- public TwitterServer(Map<String, String> configuration, int partition, OutputStream os,
- ExecutorService executorService, InputStream in) {
- dataProvider = new DataProvider(configuration, partition, os);
- this.executorService = executorService;
- this.in = in;
- this.started = false;
- }
-
- @Override
- public boolean stop() throws IOException {
- dataProvider.stop();
- return true;
- }
-
- public synchronized void start() {
- if (!started) {
- executorService.execute(dataProvider);
- started = true;
- }
- }
-
- @Override
- public boolean skipError() throws Exception {
- return false;
- }
-
- @Override
- public int read() throws IOException {
- if (!started) {
- start();
- }
- return in.read();
- }
-
- @Override
- public int read(byte b[], int off, int len) throws IOException {
- if (!started) {
- start();
- started = true;
- }
- return in.read(b, off, len);
- }
-
- @Override
- public void setFeedLogManager(FeedLogManager logManager) {
- }
-
- @Override
- public void setController(AbstractFeedDataFlowController controller) {
- }
- }
-
- private static class DataProvider implements Runnable {
-
- public static final String KEY_MODE = "mode";
-
- private final TweetGenerator tweetGenerator;
- private boolean continuePush = true;
- private int batchSize;
- private final Mode mode;
- private final OutputStream os;
-
- public static enum Mode {
- AGGRESSIVE,
- CONTROLLED
- }
-
- public DataProvider(Map<String, String> configuration, int partition, OutputStream os) {
- this.tweetGenerator = new TweetGenerator(configuration, partition);
- this.tweetGenerator.registerSubscriber(os);
- this.os = os;
- mode = configuration.get(KEY_MODE) != null ? Mode.valueOf(configuration.get(KEY_MODE).toUpperCase())
- : Mode.AGGRESSIVE;
- switch (mode) {
- case CONTROLLED:
- String tpsValue = configuration.get(TweetGenerator.KEY_TPS);
- if (tpsValue == null) {
- throw new IllegalArgumentException("TPS value not configured. use tps=<value>");
- }
- batchSize = Integer.parseInt(tpsValue);
- break;
- case AGGRESSIVE:
- batchSize = 5000;
- break;
- }
- }
-
- @Override
- public void run() {
- boolean moreData = true;
- long startBatch;
- long endBatch;
- while (true) {
- try {
- while (moreData && continuePush) {
- switch (mode) {
- case AGGRESSIVE:
- moreData = tweetGenerator.generateNextBatch(batchSize);
- break;
- case CONTROLLED:
- startBatch = System.currentTimeMillis();
- moreData = tweetGenerator.generateNextBatch(batchSize);
- endBatch = System.currentTimeMillis();
- if ((endBatch - startBatch) < 1000) {
- Thread.sleep(1000 - (endBatch - startBatch));
- }
- break;
- }
- }
- os.close();
- break;
- } catch (Exception e) {
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning("Exception in adapter " + e.getMessage());
- }
- }
- }
- }
-
- public void stop() {
- continuePush = false;
- }
- }
-
- @Override
- public void setFeedLogManager(FeedLogManager feedLogManager) {
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/121e1d9a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java
index 8475e45..9485b77 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java
@@ -19,11 +19,9 @@
package org.apache.asterix.external.operators;
import java.util.Map;
-import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
-import org.apache.asterix.external.feed.api.IFeedLifecycleListener.ConnectionLocation;
import org.apache.asterix.external.feed.api.IFeedManager;
import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
import org.apache.asterix.external.feed.api.IFeedSubscriptionManager;
@@ -52,7 +50,7 @@ public class FeedCollectOperatorDescriptor extends AbstractSingleActivityOperato
private static final long serialVersionUID = 1L;
private static final Logger LOGGER = Logger.getLogger(FeedCollectOperatorDescriptor.class.getName());
- /** The type associated with the ADM data output from the feed adaptor */
+ /** The type associated with the ADM data output from (the feed adapter OR the compute operator) */
private final IAType outputType;
/** unique identifier for a feed instance. */
@@ -67,12 +65,12 @@ public class FeedCollectOperatorDescriptor extends AbstractSingleActivityOperato
/** The source feed from which the feed derives its data from. **/
private final FeedId sourceFeedId;
- /** The subscription location at which the recipient feed receives tuples from the source feed **/
- private final ConnectionLocation subscriptionLocation;
+ /** The subscription location at which the recipient feed receives tuples from the source feed {SOURCE_FEED_INTAKE_STAGE , SOURCE_FEED_COMPUTE_STAGE} **/
+ private final FeedRuntimeType subscriptionLocation;
public FeedCollectOperatorDescriptor(JobSpecification spec, FeedConnectionId feedConnectionId, FeedId sourceFeedId,
ARecordType atype, RecordDescriptor rDesc, Map<String, String> feedPolicyProperties,
- ConnectionLocation subscriptionLocation) {
+ FeedRuntimeType subscriptionLocation) {
super(spec, 0, 1);
recordDescriptors[0] = rDesc;
this.outputType = atype;
@@ -85,45 +83,25 @@ public class FeedCollectOperatorDescriptor extends AbstractSingleActivityOperato
@Override
public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions)
- throws HyracksDataException {
+ throws HyracksDataException {
IAsterixAppRuntimeContext runtimeCtx = (IAsterixAppRuntimeContext) ctx.getJobletContext()
.getApplicationContext().getApplicationObject();
this.subscriptionManager = ((IFeedManager) runtimeCtx.getFeedManager()).getFeedSubscriptionManager();
ISubscribableRuntime sourceRuntime = null;
- IOperatorNodePushable nodePushable = null;
+ SubscribableFeedRuntimeId feedSubscribableRuntimeId = new SubscribableFeedRuntimeId(sourceFeedId,
+ subscriptionLocation, partition);
switch (subscriptionLocation) {
- case SOURCE_FEED_INTAKE_STAGE:
- try {
- SubscribableFeedRuntimeId feedSubscribableRuntimeId = new SubscribableFeedRuntimeId(sourceFeedId,
- FeedRuntimeType.INTAKE, partition);
- sourceRuntime = getIntakeRuntime(feedSubscribableRuntimeId);
- if (sourceRuntime == null) {
- throw new HyracksDataException(
- "Source intake task not found for source feed id " + sourceFeedId);
- }
- nodePushable = new FeedCollectOperatorNodePushable(ctx, sourceFeedId, connectionId,
- feedPolicyProperties, partition, nPartitions, sourceRuntime);
-
- } catch (Exception exception) {
- if (LOGGER.isLoggable(Level.SEVERE)) {
- LOGGER.severe("Initialization of the feed adaptor failed with exception " + exception);
- }
- throw new HyracksDataException("Initialization of the feed adapter failed", exception);
- }
+ case INTAKE:
+ sourceRuntime = getIntakeRuntime(feedSubscribableRuntimeId);
break;
- case SOURCE_FEED_COMPUTE_STAGE:
- SubscribableFeedRuntimeId feedSubscribableRuntimeId = new SubscribableFeedRuntimeId(sourceFeedId,
- FeedRuntimeType.COMPUTE, partition);
+ case COMPUTE:
sourceRuntime = subscriptionManager.getSubscribableRuntime(feedSubscribableRuntimeId);
- if (sourceRuntime == null) {
- throw new HyracksDataException("Source compute task not found for source feed id " + sourceFeedId
- + " " + FeedRuntimeType.COMPUTE + "[" + partition + "]");
- }
- nodePushable = new FeedCollectOperatorNodePushable(ctx, sourceFeedId, connectionId,
- feedPolicyProperties, partition, nPartitions, sourceRuntime);
break;
+ default:
+ throw new HyracksDataException("Can't subscirbe to FeedRuntime with Type: " + subscriptionLocation);
}
- return nodePushable;
+ return new FeedCollectOperatorNodePushable(ctx, sourceFeedId, connectionId, feedPolicyProperties, partition,
+ nPartitions, sourceRuntime);
}
public FeedConnectionId getFeedConnectionId() {
@@ -150,7 +128,7 @@ public class FeedCollectOperatorDescriptor extends AbstractSingleActivityOperato
return (IngestionRuntime) subscriptionManager.getSubscribableRuntime(subscribableRuntimeId);
}
- public ConnectionLocation getSubscriptionLocation() {
+ public FeedRuntimeType getSubscriptionLocation() {
return subscriptionLocation;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/121e1d9a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorNodePushable.java b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorNodePushable.java
index 7901f03..87e1edb 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorNodePushable.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorNodePushable.java
@@ -46,10 +46,9 @@ import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
/**
- * The runtime for @see{FeedIntakeOperationDescriptor}
+ * The first operator in a collect job in a feed.
*/
public class FeedCollectOperatorNodePushable extends AbstractUnaryOutputSourceOperatorNodePushable {
-
private static Logger LOGGER = Logger.getLogger(FeedCollectOperatorNodePushable.class.getName());
private final int partition;
@@ -74,19 +73,16 @@ public class FeedCollectOperatorNodePushable extends AbstractUnaryOutputSourceOp
this.connectionId = feedConnectionId;
this.sourceRuntime = sourceRuntime;
this.feedPolicy = feedPolicy;
- policyAccessor = new FeedPolicyAccessor(feedPolicy);
- IAsterixAppRuntimeContext runtimeCtx = (IAsterixAppRuntimeContext) ctx.getJobletContext()
- .getApplicationContext().getApplicationObject();
- this.feedManager = (IFeedManager) runtimeCtx.getFeedManager();
+ this.policyAccessor = new FeedPolicyAccessor(feedPolicy);
+ this.feedManager = (IFeedManager) ((IAsterixAppRuntimeContext) ctx.getJobletContext().getApplicationContext()
+ .getApplicationObject()).getFeedManager();
}
@Override
public void initialize() throws HyracksDataException {
try {
outputRecordDescriptor = recordDesc;
- FeedRuntimeType sourceRuntimeType = ((SubscribableFeedRuntimeId) sourceRuntime.getRuntimeId())
- .getFeedRuntimeType();
- switch (sourceRuntimeType) {
+ switch (((SubscribableFeedRuntimeId) sourceRuntime.getRuntimeId()).getFeedRuntimeType()) {
case INTAKE:
handleCompleteConnection();
break;
@@ -94,7 +90,8 @@ public class FeedCollectOperatorNodePushable extends AbstractUnaryOutputSourceOp
handlePartialConnection();
break;
default:
- throw new IllegalStateException("Invalid source type " + sourceRuntimeType);
+ throw new IllegalStateException("Invalid source type "
+ + ((SubscribableFeedRuntimeId) sourceRuntime.getRuntimeId()).getFeedRuntimeType());
}
State state = collectRuntime.waitTillCollectionOver();
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/121e1d9a/asterix-external-data/src/main/java/org/apache/asterix/external/parser/AbstractDataParser.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/parser/AbstractDataParser.java b/asterix-external-data/src/main/java/org/apache/asterix/external/parser/AbstractDataParser.java
index cfd1b9c..2517166 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/parser/AbstractDataParser.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/parser/AbstractDataParser.java
@@ -114,7 +114,7 @@ public abstract class AbstractDataParser implements IDataParser {
.getSerializerDeserializer(BuiltinType.ADOUBLE);
@SuppressWarnings("unchecked")
protected ISerializerDeserializer<AString> stringSerde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.ASTRING);
+ .getAStringSerializerDeserializer();
@SuppressWarnings("unchecked")
protected ISerializerDeserializer<ABinary> binarySerde = AqlSerializerDeserializerProvider.INSTANCE
.getSerializerDeserializer(BuiltinType.ABINARY);
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/121e1d9a/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java b/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
index 159ea73..d362201 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
@@ -22,12 +22,12 @@ import java.io.IOException;
import java.util.Map;
import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.external.api.AsterixInputStream;
import org.apache.asterix.external.api.IDataFlowController;
import org.apache.asterix.external.api.IDataParserFactory;
import org.apache.asterix.external.api.IExternalDataSourceFactory;
import org.apache.asterix.external.api.IIndexingDatasource;
-import org.apache.asterix.external.api.IInputStreamProvider;
-import org.apache.asterix.external.api.IInputStreamProviderFactory;
+import org.apache.asterix.external.api.IInputStreamFactory;
import org.apache.asterix.external.api.IRecordDataParser;
import org.apache.asterix.external.api.IRecordDataParserFactory;
import org.apache.asterix.external.api.IRecordReader;
@@ -44,7 +44,6 @@ import org.apache.asterix.external.dataflow.FeedWithMetaDataFlowController;
import org.apache.asterix.external.dataflow.IndexingDataFlowController;
import org.apache.asterix.external.dataflow.RecordDataFlowController;
import org.apache.asterix.external.dataflow.StreamDataFlowController;
-import org.apache.asterix.external.input.stream.AInputStream;
import org.apache.asterix.external.parser.RecordWithMetadataParser;
import org.apache.asterix.external.util.DataflowUtils;
import org.apache.asterix.external.util.ExternalDataUtils;
@@ -106,17 +105,15 @@ public class DataflowControllerProvider {
recordReader, 1);
}
case STREAM:
- IInputStreamProviderFactory streamProviderFactory = (IInputStreamProviderFactory) dataSourceFactory;
- IInputStreamProvider streamProvider = streamProviderFactory.createInputStreamProvider(ctx,
- partition);
+ IInputStreamFactory streamFactory = (IInputStreamFactory) dataSourceFactory;
+ AsterixInputStream stream = streamFactory.createInputStream(ctx, partition);
IStreamDataParserFactory streamParserFactory = (IStreamDataParserFactory) dataParserFactory;
IStreamDataParser streamParser = streamParserFactory.createInputStreamParser(ctx, partition);
- AInputStream inputStream = streamProvider.getInputStream();
- streamParser.setInputStream(inputStream);
+ streamParser.setInputStream(stream);
if (isFeed) {
return new FeedStreamDataFlowController(ctx,
(FeedTupleForwarder) DataflowUtils.getTupleForwarder(configuration, feedLogManager),
- feedLogManager, FeedUtils.getNumOfFields(configuration), streamParser, inputStream);
+ feedLogManager, FeedUtils.getNumOfFields(configuration), streamParser, stream);
} else {
return new StreamDataFlowController(ctx, DataflowUtils.getTupleForwarder(configuration, null),
streamParser);
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/121e1d9a/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java b/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
index e9307e5..f8d64e0 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
@@ -23,7 +23,7 @@ import java.util.Map;
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.external.api.IExternalDataSourceFactory;
import org.apache.asterix.external.api.IExternalDataSourceFactory.DataSourceType;
-import org.apache.asterix.external.api.IInputStreamProviderFactory;
+import org.apache.asterix.external.api.IInputStreamFactory;
import org.apache.asterix.external.api.IRecordReaderFactory;
import org.apache.asterix.external.input.HDFSDataSourceFactory;
import org.apache.asterix.external.input.record.reader.RecordWithPKTestReaderFactory;
@@ -33,9 +33,9 @@ import org.apache.asterix.external.input.record.reader.stream.EmptyLineSeparated
import org.apache.asterix.external.input.record.reader.stream.LineRecordReaderFactory;
import org.apache.asterix.external.input.record.reader.stream.SemiStructuredRecordReaderFactory;
import org.apache.asterix.external.input.record.reader.twitter.TwitterRecordReaderFactory;
-import org.apache.asterix.external.input.stream.factory.LocalFSInputStreamProviderFactory;
-import org.apache.asterix.external.input.stream.factory.SocketServerInputStreamProviderFactory;
-import org.apache.asterix.external.input.stream.factory.TwitterFirehoseStreamProviderFactory;
+import org.apache.asterix.external.input.stream.factory.LocalFSInputStreamFactory;
+import org.apache.asterix.external.input.stream.factory.SocketServerInputStreamFactory;
+import org.apache.asterix.external.input.stream.factory.TwitterFirehoseStreamFactory;
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.external.util.ExternalDataUtils;
@@ -53,9 +53,9 @@ public class DatasourceFactoryProvider {
}
}
- public static IInputStreamProviderFactory getInputStreamFactory(String streamSource,
+ public static IInputStreamFactory getInputStreamFactory(String streamSource,
Map<String, String> configuration) throws AsterixException {
- IInputStreamProviderFactory streamSourceFactory;
+ IInputStreamFactory streamSourceFactory;
if (ExternalDataUtils.isExternal(streamSource)) {
String dataverse = ExternalDataUtils.getDataverse(configuration);
streamSourceFactory = ExternalDataUtils.createExternalInputStreamFactory(dataverse, streamSource);
@@ -65,17 +65,17 @@ public class DatasourceFactoryProvider {
streamSourceFactory = new HDFSDataSourceFactory();
break;
case ExternalDataConstants.STREAM_LOCAL_FILESYSTEM:
- streamSourceFactory = new LocalFSInputStreamProviderFactory();
+ streamSourceFactory = new LocalFSInputStreamFactory();
break;
case ExternalDataConstants.STREAM_SOCKET:
case ExternalDataConstants.ALIAS_SOCKET_ADAPTER:
- streamSourceFactory = new SocketServerInputStreamProviderFactory();
+ streamSourceFactory = new SocketServerInputStreamFactory();
break;
case ExternalDataConstants.STREAM_SOCKET_CLIENT:
- streamSourceFactory = new SocketServerInputStreamProviderFactory();
+ streamSourceFactory = new SocketServerInputStreamFactory();
break;
case ExternalDataConstants.ALIAS_TWITTER_FIREHOSE_ADAPTER:
- streamSourceFactory = new TwitterFirehoseStreamProviderFactory();
+ streamSourceFactory = new TwitterFirehoseStreamFactory();
break;
default:
throw new AsterixException("unknown input stream factory");
@@ -90,7 +90,7 @@ public class DatasourceFactoryProvider {
return ExternalDataUtils.createExternalRecordReaderFactory(configuration);
}
String parser = configuration.get(ExternalDataConstants.KEY_PARSER);
- IInputStreamProviderFactory inputStreamFactory;
+ IInputStreamFactory inputStreamFactory;
switch (parser) {
case ExternalDataConstants.FORMAT_ADM:
case ExternalDataConstants.FORMAT_JSON:
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/121e1d9a/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java b/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
index 0e68698..a02152b 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
@@ -206,8 +206,9 @@ public class ExternalDataConstants {
* Size default values
*/
public static final int DEFAULT_BUFFER_SIZE = 4096;
- public static final int DEFAULT_BUFFER_INCREMENT = 4096;
+ public static final int DEFAULT_BUFFER_INCREMENT = 2048;
public static final int DEFAULT_QUEUE_SIZE = 64;
+ public static final int MAX_RECORD_SIZE = 32000000;
/**
* Expected parameter values
@@ -226,4 +227,6 @@ public class ExternalDataConstants {
public static final String FORMAT_CSV = "csv";
public static final String TEST_RECORD_WITH_PK = "test-record-with-pk";
+ public static final String ERROR_LARGE_RECORD = "Record is too large";
+ public static final String ERROR_PARSE_RECORD = "Parser failed to parse record";
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/121e1d9a/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java b/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
index 32139f1..42fe8bf 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
@@ -25,7 +25,7 @@ import java.util.Map;
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.external.api.IDataParserFactory;
import org.apache.asterix.external.api.IExternalDataSourceFactory.DataSourceType;
-import org.apache.asterix.external.api.IInputStreamProviderFactory;
+import org.apache.asterix.external.api.IInputStreamFactory;
import org.apache.asterix.external.api.IRecordReaderFactory;
import org.apache.asterix.external.library.ExternalLibraryManager;
import org.apache.asterix.om.types.ARecordType;
@@ -116,13 +116,13 @@ public class ExternalDataUtils {
return aString.trim().split(FeedConstants.NamingConstants.LIBRARY_NAME_SEPARATOR)[1];
}
- public static IInputStreamProviderFactory createExternalInputStreamFactory(String dataverse, String stream)
+ public static IInputStreamFactory createExternalInputStreamFactory(String dataverse, String stream)
throws AsterixException {
try {
String libraryName = getLibraryName(stream);
String className = getExternalClassName(stream);
ClassLoader classLoader = getClassLoader(dataverse, libraryName);
- return ((IInputStreamProviderFactory) (classLoader.loadClass(className).newInstance()));
+ return ((IInputStreamFactory) (classLoader.loadClass(className).newInstance()));
} catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) {
throw new AsterixException("Failed to create stream factory", e);
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/121e1d9a/asterix-external-data/src/main/java/org/apache/asterix/external/util/FileSystemWatcher.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/util/FileSystemWatcher.java b/asterix-external-data/src/main/java/org/apache/asterix/external/util/FileSystemWatcher.java
index 386a8cf..4eec348 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/util/FileSystemWatcher.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/util/FileSystemWatcher.java
@@ -42,7 +42,7 @@ import org.apache.log4j.Logger;
public class FileSystemWatcher {
private static final Logger LOGGER = Logger.getLogger(FileSystemWatcher.class.getName());
- private final WatchService watcher;
+ private WatchService watcher;
private final HashMap<WatchKey, Path> keys;
private final LinkedList<File> files = new LinkedList<File>();
private Iterator<File> it;
@@ -53,17 +53,14 @@ public class FileSystemWatcher {
private boolean done;
private File current;
private AbstractFeedDataFlowController controller;
+ private final LinkedList<Path> dirs;
- public FileSystemWatcher(Path inputResource, String expression, boolean isFeed) throws HyracksDataException {
- try {
- this.watcher = isFeed ? FileSystems.getDefault().newWatchService() : null;
- this.keys = isFeed ? new HashMap<WatchKey, Path>() : null;
- this.expression = expression;
- this.path = inputResource;
- this.isFeed = isFeed;
- } catch (IOException e) {
- throw new HyracksDataException(e);
- }
+ public FileSystemWatcher(Path inputResource, String expression, boolean isFeed) {
+ this.keys = isFeed ? new HashMap<WatchKey, Path>() : null;
+ this.expression = expression;
+ this.path = inputResource;
+ this.isFeed = isFeed;
+ this.dirs = new LinkedList<Path>();
}
public void setFeedLogManager(FeedLogManager feedLogManager) {
@@ -72,11 +69,19 @@ public class FileSystemWatcher {
public void init() throws HyracksDataException {
try {
- LinkedList<Path> dirs = null;
- dirs = new LinkedList<Path>();
+ dirs.clear();
LocalFileSystemUtils.traverse(files, path.toFile(), expression, dirs);
it = files.iterator();
if (isFeed) {
+ keys.clear();
+ if (watcher != null) {
+ try {
+ watcher.close();
+ } catch (IOException e) {
+ LOGGER.warn("Failed to close watcher service", e);
+ }
+ }
+ watcher = FileSystems.getDefault().newWatchService();
for (Path path : dirs) {
register(path);
}
@@ -125,7 +130,7 @@ public class FileSystemWatcher {
return (WatchEvent<T>) event;
}
- private void handleEvents(WatchKey key) {
+ private void handleEvents(WatchKey key) throws IOException {
// get dir associated with the key
Path dir = keys.get(key);
if (dir == null) {
@@ -143,7 +148,10 @@ public class FileSystemWatcher {
if (LOGGER.isEnabledFor(Level.WARN)) {
LOGGER.warn("Overflow event. Some events might have been missed");
}
- continue;
+ // need to read and validate all files.
+ //TODO: use btrees for all logs
+ init();
+ return;
}
// Context for directory entry event is the file name of entry
@@ -172,6 +180,7 @@ public class FileSystemWatcher {
if (!done) {
if (watcher != null) {
watcher.close();
+ watcher = null;
}
if (logManager != null) {
if (current != null) {
@@ -205,7 +214,7 @@ public class FileSystemWatcher {
return false;
}
- public boolean hasNext() throws HyracksDataException {
+ public boolean hasNext() throws IOException {
if (it.hasNext()) {
return true;
}
@@ -222,6 +231,7 @@ public class FileSystemWatcher {
while (key != null) {
handleEvents(key);
if (endOfEvents(key)) {
+ close();
return false;
}
key = watcher.poll();
@@ -237,12 +247,18 @@ public class FileSystemWatcher {
if (LOGGER.isEnabledFor(Level.WARN)) {
LOGGER.warn("Feed Closed");
}
- return false;
+ if (watcher == null) {
+ return false;
+ }
+ continue;
} catch (ClosedWatchServiceException e) {
if (LOGGER.isEnabledFor(Level.WARN)) {
LOGGER.warn("The watcher has exited");
}
- return false;
+ if (watcher == null) {
+ return false;
+ }
+ continue;
}
handleEvents(key);
if (endOfEvents(key)) {
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/121e1d9a/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java b/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
index 84ccc35..936ada5 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
@@ -28,7 +28,7 @@ import org.apache.asterix.common.config.DatasetConfig.ExternalFilePendingOp;
import org.apache.asterix.external.indexing.ExternalFile;
import org.apache.asterix.external.indexing.IndexingScheduler;
import org.apache.asterix.external.indexing.RecordId.RecordIdType;
-import org.apache.asterix.external.input.stream.provider.HDFSInputStreamProvider;
+import org.apache.asterix.external.input.stream.HDFSInputStream;
import org.apache.asterix.om.util.AsterixAppContextInfo;
import org.apache.asterix.om.util.AsterixClusterProperties;
import org.apache.hadoop.fs.BlockLocation;
@@ -186,7 +186,7 @@ public class HDFSUtils {
conf.set(ExternalDataConstants.KEY_HADOOP_FILESYSTEM_URI,
configuration.get(ExternalDataConstants.KEY_HDFS_URL).trim());
conf.set(ExternalDataConstants.KEY_HADOOP_FILESYSTEM_CLASS, ExternalDataConstants.CLASS_NAME_HDFS_FILESYSTEM);
- conf.setClassLoader(HDFSInputStreamProvider.class.getClassLoader());
+ conf.setClassLoader(HDFSInputStream.class.getClassLoader());
conf.set(ExternalDataConstants.KEY_HADOOP_INPUT_DIR, configuration.get(ExternalDataConstants.KEY_PATH).trim());
conf.set(ExternalDataConstants.KEY_HADOOP_INPUT_FORMAT, formatClassName);
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/121e1d9a/asterix-external-data/src/test/java/org/apache/asterix/external/classad/AttributeReference.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/test/java/org/apache/asterix/external/classad/AttributeReference.java b/asterix-external-data/src/test/java/org/apache/asterix/external/classad/AttributeReference.java
index 04fe6ec..da2cebd 100644
--- a/asterix-external-data/src/test/java/org/apache/asterix/external/classad/AttributeReference.java
+++ b/asterix-external-data/src/test/java/org/apache/asterix/external/classad/AttributeReference.java
@@ -18,41 +18,42 @@
*/
package org.apache.asterix.external.classad;
+import org.apache.asterix.external.classad.object.pool.ClassAdObjectPool;
import org.apache.asterix.om.base.AMutableInt32;
import org.apache.commons.lang3.mutable.MutableBoolean;
import org.apache.hyracks.api.exceptions.HyracksDataException;
public class AttributeReference extends ExprTree {
- private ExprTree expr;
+ private final ExprTreeHolder expr;
private boolean absolute;
- private AMutableCharArrayString attributeStr;
- private ClassAd current = new ClassAd(false, false);
- private ExprList adList = new ExprList();
- private Value val = new Value();
- private MutableBoolean rVal = new MutableBoolean(false);
+ private final AMutableCharArrayString attributeStr;
+ private final ExprList adList;
+ private final Value val;
private AttributeReference tempAttrRef;
- private EvalState tstate = new EvalState();
+ private final EvalState tstate;
+ private MutableBoolean rVal = new MutableBoolean(false);
+ private ClassAd current;
public ExprTree getExpr() {
return expr;
}
public void setExpr(ExprTree expr) {
- this.expr = expr == null ? null : expr.self();
+ this.expr.setInnerTree(expr.self());
}
- public AttributeReference() {
- expr = null;
- attributeStr = null;
+ public AttributeReference(ClassAdObjectPool objectPool) {
+ super(objectPool);
+ this.val = new Value(objectPool);
+ this.current = new ClassAd(this.objectPool);
+ this.tstate = new EvalState(this.objectPool);
+ this.adList = new ExprList(this.objectPool);
+ this.attributeStr = new AMutableCharArrayString();
+ this.expr = new ExprTreeHolder(objectPool);
absolute = false;
}
- /// Copy Constructor
- public AttributeReference(AttributeReference ref) throws HyracksDataException {
- copyFrom(ref);
- }
-
/// Assignment operator
@Override
public boolean equals(Object o) {
@@ -69,8 +70,9 @@ public class AttributeReference extends ExprTree {
return NodeKind.ATTRREF_NODE;
}
- public static AttributeReference createAttributeReference(ExprTree expr, AMutableCharArrayString attrName) {
- return createAttributeReference(expr, attrName, false);
+ public static AttributeReference createAttributeReference(ExprTree expr, AMutableCharArrayString attrName,
+ ClassAdObjectPool objectPool) {
+ return createAttributeReference(expr, attrName, false, objectPool);
}
/**
@@ -80,7 +82,7 @@ public class AttributeReference extends ExprTree {
*/
@Override
public ExprTree copy() throws HyracksDataException {
- AttributeReference newTree = new AttributeReference();
+ AttributeReference newTree = objectPool.attrRefPool.get();
newTree.copyFrom(this);
return newTree;
}
@@ -94,14 +96,8 @@ public class AttributeReference extends ExprTree {
* @throws HyracksDataException
*/
public boolean copyFrom(AttributeReference ref) throws HyracksDataException {
- if (attributeStr == null) {
- attributeStr = new AMutableCharArrayString(ref.attributeStr);
- } else {
- attributeStr.setValue(ref.attributeStr);
- }
- if (ref.expr != null) {
- expr = ref.expr.copy();
- }
+ attributeStr.setValue(ref.attributeStr);
+ expr.setInnerTree(ref.expr);
super.copyFrom(ref);
this.absolute = ref.absolute;
return true;
@@ -127,7 +123,7 @@ public class AttributeReference extends ExprTree {
if (absolute != other_ref.absolute || !attributeStr.equals(other_ref.attributeStr)) {
is_same = false;
} else if ((expr == null && other_ref.expr == null) || (expr.equals(other_ref.expr))
- || (expr != null && other_ref.expr != null && ((AttributeReference) expr).sameAs(other_ref.expr))) {
+ || (expr != null && other_ref.expr != null && expr.sameAs(other_ref.expr))) {
// Will this check result in infinite recursion? How do I stop it?
is_same = true;
} else {
@@ -137,18 +133,9 @@ public class AttributeReference extends ExprTree {
return is_same;
}
- // a private ctor for use in significant expr identification
- private AttributeReference(ExprTree tree, AMutableCharArrayString attrname, boolean absolut) {
- attributeStr = attrname;
- expr = tree == null ? null : tree.self();
- absolute = absolut;
- }
-
@Override
public void privateSetParentScope(ClassAd parent) {
- if (expr != null) {
- expr.setParentScope(parent);
- }
+ expr.setParentScope(parent);
}
public void getComponents(ExprTreeHolder tree, AMutableCharArrayString attr, MutableBoolean abs)
@@ -161,7 +148,7 @@ public class AttributeReference extends ExprTree {
public EvalResult findExpr(EvalState state, ExprTreeHolder tree, ExprTreeHolder sig, boolean wantSig)
throws HyracksDataException {
// establish starting point for search
- if (expr == null) {
+ if (expr.getTree() == null) {
// "attr" and ".attr"
current = absolute ? state.getRootAd() : state.getCurAd();
if (absolute && (current == null)) { // NAC - circularity so no root
@@ -186,7 +173,7 @@ public class AttributeReference extends ExprTree {
}
if (val.isListValue()) {
- ExprList eList = new ExprList();
+ ExprList eList = objectPool.exprListPool.get();
//
// iterate through exprList and apply attribute reference
// to each exprTree
@@ -195,12 +182,12 @@ public class AttributeReference extends ExprTree {
return (EvalResult.EVAL_FAIL);
} else {
if (tempAttrRef == null) {
- tempAttrRef = new AttributeReference();
+ tempAttrRef = objectPool.attrRefPool.get();
} else {
tempAttrRef.reset();
}
createAttributeReference(currExpr.copy(), attributeStr, false, tempAttrRef);
- val.clear();
+ val.setUndefinedValue();
// Create new EvalState, within this scope, because
// attrRef is only temporary, so we do not want to
// cache the evaluated result in the outer state object.
@@ -212,8 +199,8 @@ public class AttributeReference extends ExprTree {
return (EvalResult.EVAL_FAIL);
}
- ClassAd evaledAd = new ClassAd();
- ExprList evaledList = new ExprList();
+ ClassAd evaledAd = objectPool.classAdPool.get();
+ ExprList evaledList = objectPool.exprListPool.get();
if (val.isClassAdValue(evaledAd)) {
eList.add(evaledAd);
continue;
@@ -221,12 +208,13 @@ public class AttributeReference extends ExprTree {
eList.add(evaledList.copy());
continue;
} else {
- eList.add(Literal.createLiteral(val));
+ eList.add(Literal.createLiteral(val, objectPool));
}
}
}
- tree.setInnerTree(ExprList.createExprList(eList));
- ClassAd newRoot = new ClassAd();
+
+ tree.setInnerTree(ExprList.createExprList(eList, objectPool));
+ ClassAd newRoot = objectPool.classAdPool.get();
tree.setParentScope(newRoot);
return EvalResult.EVAL_OK;
}
@@ -242,8 +230,15 @@ public class AttributeReference extends ExprTree {
if (current == null) {
return EvalResult.EVAL_UNDEF;
}
- int rc = current.lookupInScope(attributeStr.toString(), tree, state);
- if (expr == null && !absolute && rc == EvalResult.EVAL_UNDEF.ordinal() && current.getAlternateScope() != null) {
+ int rc = 0;
+ try {
+ rc = current.lookupInScope(attributeStr.toString(), tree, state);
+ } catch (Throwable th) {
+ th.printStackTrace();
+ throw th;
+ }
+ if (expr.getTree() == null && !absolute && rc == EvalResult.EVAL_UNDEF.ordinal()
+ && current.getAlternateScope() != null) {
rc = current.getAlternateScope().lookupInScope(attributeStr.toString(), tree, state);
}
return EvalResult.values()[rc];
@@ -251,9 +246,10 @@ public class AttributeReference extends ExprTree {
@Override
public boolean publicEvaluate(EvalState state, Value val) throws HyracksDataException {
- ExprTreeHolder tree = new ExprTreeHolder();
- ExprTreeHolder dummy = new ExprTreeHolder();
- ClassAd curAd = new ClassAd(state.getCurAd());
+ ExprTreeHolder tree = objectPool.mutableExprPool.get();
+ ExprTreeHolder dummy = objectPool.mutableExprPool.get();
+ ClassAd curAd = objectPool.classAdPool.get();
+ curAd.copyFrom(state.getCurAd());
boolean rval;
// find the expression and the evalstate
switch (findExpr(state, tree, dummy, false)) {
@@ -286,10 +282,13 @@ public class AttributeReference extends ExprTree {
@Override
public boolean privateEvaluate(EvalState state, Value val, ExprTreeHolder sig) throws HyracksDataException {
- ExprTreeHolder tree = new ExprTreeHolder();
- ExprTreeHolder exprSig = new ExprTreeHolder();
- ClassAd curAd = new ClassAd(state.getCurAd());
- MutableBoolean rval = new MutableBoolean(true);
+ ExprTreeHolder tree = objectPool.mutableExprPool.get();
+ ExprTreeHolder exprSig = objectPool.mutableExprPool.get();
+ ClassAd curAd = objectPool.classAdPool.get();
+ curAd.copyFrom(state.getCurAd());
+ MutableBoolean rval = objectPool.boolPool.get();
+ rval.setValue(true);
+
switch (findExpr(state, tree, exprSig, true)) {
case EVAL_FAIL:
rval.setValue(false);
@@ -314,7 +313,9 @@ public class AttributeReference extends ExprTree {
default:
throw new HyracksDataException("ClassAd: Should not reach here");
}
- sig.setInnerTree((new AttributeReference(exprSig, attributeStr, absolute)));
+ AttributeReference newAttrRef = objectPool.attrRefPool.get();
+ newAttrRef.setValue(exprSig, attributeStr, absolute);
+ sig.setInnerTree(newAttrRef);
state.getCurAd().setValue(curAd);
return rval.booleanValue();
}
@@ -322,8 +323,8 @@ public class AttributeReference extends ExprTree {
@Override
public boolean privateFlatten(EvalState state, Value val, ExprTreeHolder ntree, AMutableInt32 op)
throws HyracksDataException {
- ExprTreeHolder tree = new ExprTreeHolder();
- ExprTreeHolder dummy = new ExprTreeHolder();
+ ExprTreeHolder tree = objectPool.mutableExprPool.get();
+ ExprTreeHolder dummy = objectPool.mutableExprPool.get();
ClassAd curAd;
boolean rval;
ntree.setInnerTree(null); // Just to be safe... wenger 2003-12-11.
@@ -338,8 +339,8 @@ public class AttributeReference extends ExprTree {
return true;
case EVAL_UNDEF:
if (expr != null && state.isFlattenAndInline()) {
- ExprTreeHolder expr_ntree = new ExprTreeHolder();
- Value expr_val = new Value();
+ ExprTreeHolder expr_ntree = objectPool.mutableExprPool.get();
+ Value expr_val = objectPool.valuePool.get();
if (state.getDepthRemaining() <= 0) {
val.setErrorValue();
state.getCurAd().setValue(curAd);
@@ -349,7 +350,7 @@ public class AttributeReference extends ExprTree {
rval = expr.publicFlatten(state, expr_val, expr_ntree);
state.incrementDepth();
if (rval && expr_ntree.getInnerTree() != null) {
- ntree.setInnerTree(createAttributeReference(expr_ntree, attributeStr));
+ ntree.setInnerTree(createAttributeReference(expr_ntree, attributeStr, objectPool));
if (ntree.getInnerTree() != null) {
state.getCurAd().setValue(curAd);
return true;
@@ -412,14 +413,16 @@ public class AttributeReference extends ExprTree {
* expr is not NULL, default value is false;
*/
public static AttributeReference createAttributeReference(ExprTree tree, AMutableCharArrayString attrStr,
- boolean absolut) {
- return (new AttributeReference(tree, attrStr, absolut));
+ boolean absolut, ClassAdObjectPool objectPool) {
+ AttributeReference attrRef = objectPool.attrRefPool.get();
+ attrRef.setValue(tree, attrStr, absolut);
+ return attrRef;
}
public void setValue(ExprTree tree, AMutableCharArrayString attrStr, boolean absolut) {
this.absolute = absolut;
- this.attributeStr = attrStr;
- this.expr = tree == null ? null : tree.self();
+ this.attributeStr.copyValue(attrStr.getValue(), attrStr.size());
+ this.expr.setInnerTree(tree == null ? null : tree.self());
}
public static void createAttributeReference(ExprTree tree, AMutableCharArrayString attrStr, boolean absolut,
@@ -429,8 +432,8 @@ public class AttributeReference extends ExprTree {
@Override
public boolean privateEvaluate(EvalState state, Value val) throws HyracksDataException {
- ExprTreeHolder tree = new ExprTreeHolder();
- ExprTreeHolder dummy = new ExprTreeHolder();
+ ExprTreeHolder tree = objectPool.mutableExprPool.get();
+ ExprTreeHolder dummy = objectPool.mutableExprPool.get();
ClassAd curAd;
boolean rval;
@@ -467,8 +470,12 @@ public class AttributeReference extends ExprTree {
@Override
public void reset() {
- if (expr != null) {
- expr.reset();
- }
+ expr.reset();
+ val.reset();
+ current.reset();
+ tstate.reset();
+ adList.reset();
+ attributeStr.reset();
+ absolute = false;
}
}