You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by am...@apache.org on 2016/01/14 21:32:00 UTC
[06/26] incubator-asterixdb git commit: Feed Fixes and Cleanup
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataExceptionUtils.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataExceptionUtils.java b/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataExceptionUtils.java
index 9dcaef4..f16e24b 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataExceptionUtils.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataExceptionUtils.java
@@ -18,6 +18,10 @@
*/
package org.apache.asterix.external.util;
+import java.util.Arrays;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
public class ExternalDataExceptionUtils {
public static final String INCORRECT_PARAMETER = "Incorrect parameter.\n";
public static final String MISSING_PARAMETER = "Missing parameter.\n";
@@ -29,4 +33,22 @@ public class ExternalDataExceptionUtils {
return INCORRECT_PARAMETER + PARAMETER_NAME + parameterName + ExternalDataConstants.LF + EXPECTED_VALUE
+ expectedValue + ExternalDataConstants.LF + PASSED_VALUE + passedValue;
}
+
+ public static String concat(String... vals) {
+ return Arrays.toString(vals);
+ }
+
+ // For now, we are accepting all exceptions as resolvable by adapter.
+ public static boolean isResolvable(Exception e) {
+ return true;
+ }
+
+ public static HyracksDataException suppress(HyracksDataException hde, Throwable th) {
+ if (hde == null) {
+ return new HyracksDataException(th);
+ } else {
+ hde.addSuppressed(th);
+ return hde;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java b/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
index 7c1c1b5..c9be872 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
@@ -23,7 +23,6 @@ import java.util.List;
import java.util.Map;
import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.common.feeds.FeedConstants;
import org.apache.asterix.external.api.IDataParserFactory;
import org.apache.asterix.external.api.IExternalDataSourceFactory.DataSourceType;
import org.apache.asterix.external.api.IInputStreamProviderFactory;
@@ -134,6 +133,15 @@ public class ExternalDataUtils {
return parserFormat != null ? parserFormat : configuration.get(ExternalDataConstants.KEY_FORMAT);
}
+ public static void setRecordFormat(Map<String, String> configuration, String format) {
+ if (!configuration.containsKey(ExternalDataConstants.KEY_DATA_PARSER)) {
+ configuration.put(ExternalDataConstants.KEY_DATA_PARSER, format);
+ }
+ if (!configuration.containsKey(ExternalDataConstants.KEY_FORMAT)) {
+ configuration.put(ExternalDataConstants.KEY_FORMAT, format);
+ }
+ }
+
private static Map<ATypeTag, IValueParserFactory> valueParserFactoryMap = initializeValueParserFactoryMap();
private static Map<ATypeTag, IValueParserFactory> initializeValueParserFactoryMap() {
@@ -219,4 +227,31 @@ public class ExternalDataUtils {
.substring(parserFactoryName.indexOf(ExternalDataConstants.EXTERNAL_LIBRARY_SEPARATOR) + 1))
.newInstance();
}
+
+ public static boolean isFeed(Map<String, String> configuration) {
+ if (!configuration.containsKey(ExternalDataConstants.KEY_IS_FEED)) {
+ return false;
+ } else {
+ return Boolean.parseBoolean(configuration.get(ExternalDataConstants.KEY_IS_FEED));
+ }
+ }
+
+ public static void prepareFeed(Map<String, String> configuration, String dataverseName, String feedName) {
+ if (!configuration.containsKey(ExternalDataConstants.KEY_IS_FEED)) {
+ configuration.put(ExternalDataConstants.KEY_IS_FEED, ExternalDataConstants.TRUE);
+ }
+ configuration.put(ExternalDataConstants.KEY_DATAVERSE, dataverseName);
+ configuration.put(ExternalDataConstants.KEY_FEED_NAME, feedName);
+ }
+
+ public static boolean keepDataSourceOpen(Map<String, String> configuration) {
+ if (!configuration.containsKey(ExternalDataConstants.KEY_WAIT_FOR_DATA)) {
+ return true;
+ }
+ return Boolean.parseBoolean(configuration.get(ExternalDataConstants.KEY_WAIT_FOR_DATA));
+ }
+
+ public static String getFeedName(Map<String, String> configuration) {
+ return configuration.get(ExternalDataConstants.KEY_FEED_NAME);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedConstants.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedConstants.java b/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedConstants.java
new file mode 100644
index 0000000..cc21360
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedConstants.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.util;
+
+public class FeedConstants {
+
+ public final static String FEEDS_METADATA_DV = "feeds_metadata";
+ public final static String FAILED_TUPLE_DATASET = "failed_tuple";
+ public final static String FAILED_TUPLE_DATASET_TYPE = "FailedTupleType";
+ public final static String FAILED_TUPLE_DATASET_KEY = "id";
+
+ public static final class StatisticsConstants {
+ public static final String INTAKE_TUPLEID = "intake-tupleid";
+ public static final String INTAKE_PARTITION = "intake-partition";
+ public static final String INTAKE_TIMESTAMP = "intake-timestamp";
+ public static final String COMPUTE_TIMESTAMP = "compute-timestamp";
+ public static final String STORE_TIMESTAMP = "store-timestamp";
+
+ }
+
+ public static final class MessageConstants {
+ public static final String MESSAGE_TYPE = "message-type";
+ public static final String NODE_ID = "nodeId";
+ public static final String DATAVERSE = "dataverse";
+ public static final String FEED = "feed";
+ public static final String DATASET = "dataset";
+ public static final String AQL = "aql";
+ public static final String RUNTIME_TYPE = "runtime-type";
+ public static final String PARTITION = "partition";
+ public static final String INTAKE_PARTITION = "intake-partition";
+ public static final String INFLOW_RATE = "inflow-rate";
+ public static final String OUTFLOW_RATE = "outflow-rate";
+ public static final String MODE = "mode";
+ public static final String CURRENT_CARDINALITY = "current-cardinality";
+ public static final String REDUCED_CARDINALITY = "reduced-cardinality";
+ public static final String VALUE_TYPE = "value-type";
+ public static final String VALUE = "value";
+ public static final String CPU_LOAD = "cpu-load";
+ public static final String N_RUNTIMES = "n_runtimes";
+ public static final String HEAP_USAGE = "heap_usage";
+ public static final String OPERAND_ID = "operand-id";
+ public static final String COMPUTE_PARTITION_RETAIN_LIMIT = "compute-partition-retain-limit";
+ public static final String LAST_PERSISTED_TUPLE_INTAKE_TIMESTAMP = "last-persisted-tuple-intake_timestamp";
+ public static final String PERSISTENCE_DELAY_WITHIN_LIMIT = "persistence-delay-within-limit";
+ public static final String AVERAGE_PERSISTENCE_DELAY = "average-persistence-delay";
+ public static final String COMMIT_ACKS = "commit-acks";
+ public static final String MAX_WINDOW_ACKED = "max-window-acked";
+ public static final String BASE = "base";
+ public static final String NOT_APPLICABLE = "N/A";
+
+ }
+
+ public static final class NamingConstants {
+ public static final String LIBRARY_NAME_SEPARATOR = "#";
+ }
+
+ public static class JobConstants {
+ public static final int DEFAULT_FRAME_SIZE = 8192;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedFrameUtil.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedFrameUtil.java b/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedFrameUtil.java
new file mode 100644
index 0000000..a2bdd64
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedFrameUtil.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.util;
+
+import java.nio.ByteBuffer;
+import java.util.BitSet;
+import java.util.Random;
+
+import org.apache.hyracks.api.comm.IFrame;
+import org.apache.hyracks.api.comm.VSizeFrame;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+
+public class FeedFrameUtil {
+ public static ByteBuffer removeBadTuple(IHyracksTaskContext ctx, int tupleIndex, FrameTupleAccessor fta)
+ throws HyracksDataException {
+ FrameTupleAppender appender = new FrameTupleAppender();
+ IFrame slicedFrame = new VSizeFrame(ctx);
+ appender.reset(slicedFrame, true);
+ int totalTuples = fta.getTupleCount();
+ for (int ti = 0; ti < totalTuples; ti++) {
+ if (ti != tupleIndex) {
+ appender.append(fta, ti);
+ }
+ }
+ return slicedFrame.getBuffer();
+ }
+
+ public static ByteBuffer getSampledFrame(IHyracksTaskContext ctx, FrameTupleAccessor fta, int sampleSize)
+ throws HyracksDataException {
+ NChooseKIterator it = new NChooseKIterator(fta.getTupleCount(), sampleSize);
+ FrameTupleAppender appender = new FrameTupleAppender();
+ IFrame sampledFrame = new VSizeFrame(ctx);
+ appender.reset(sampledFrame, true);
+ int nextTupleIndex = 0;
+ while (it.hasNext()) {
+ nextTupleIndex = it.next();
+ appender.append(fta, nextTupleIndex);
+ }
+ return sampledFrame.getBuffer();
+ }
+
+ private static class NChooseKIterator {
+
+ private final int n;
+ private final int k;
+ private final BitSet bSet;
+ private final Random random;
+
+ private int traversed = 0;
+
+ public NChooseKIterator(int n, int k) {
+ this.n = n;
+ this.k = k;
+ this.bSet = new BitSet(n);
+ bSet.set(0, n - 1);
+ this.random = new Random();
+ }
+
+ public boolean hasNext() {
+ return traversed < k;
+ }
+
+ public int next() {
+ if (hasNext()) {
+ traversed++;
+ int startOffset = random.nextInt(n);
+ int pos = -1;
+ while (pos < 0) {
+ pos = bSet.nextSetBit(startOffset);
+ if (pos < 0) {
+ startOffset = 0;
+ }
+ }
+ bSet.clear(pos);
+ return pos;
+ } else {
+ return -1;
+ }
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedLogManager.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedLogManager.java b/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedLogManager.java
new file mode 100644
index 0000000..72b438d
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedLogManager.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.util;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardOpenOption;
+import java.util.TreeSet;
+
+import org.apache.commons.io.FileUtils;
+
+public class FeedLogManager {
+
+ public enum LogEntryType {
+ START, // partition start
+ END, // partition end
+ COMMIT, // a record commit within a partition
+ SNAPSHOT // an identifier that partitions with identifiers before this one should be ignored
+ }
+
+ public static final String PROGRESS_LOG_FILE_NAME = "progress.log";
+ public static final String ERROR_LOG_FILE_NAME = "error.log";
+ public static final String BAD_RECORDS_FILE_NAME = "failed_record.log";
+ public static final String START_PREFIX = "s:";
+ public static final String END_PREFIX = "e:";
+ public static final int PREFIX_SIZE = 2;
+ private String currentPartition;
+ private TreeSet<String> completed;
+ private Path dir;
+ private BufferedWriter progressLogger;
+ private BufferedWriter errorLogger;
+ private BufferedWriter recordLogger;
+
+ public FeedLogManager(File file) {
+ this.dir = file.toPath();
+ this.completed = new TreeSet<String>();
+ }
+
+ public void endPartition() throws IOException {
+ logProgress(END_PREFIX + currentPartition);
+ completed.add(currentPartition);
+ }
+
+ public void endPartition(String partition) throws IOException {
+ currentPartition = partition;
+ logProgress(END_PREFIX + currentPartition);
+ completed.add(currentPartition);
+ }
+
+ public void startPartition(String partition) throws IOException {
+ currentPartition = partition;
+ logProgress(START_PREFIX + currentPartition);
+ }
+
+ public boolean exists() {
+ return Files.exists(dir);
+ }
+
+ public void open() throws IOException {
+ // read content of logs.
+ BufferedReader reader = Files.newBufferedReader(
+ Paths.get(dir.toAbsolutePath().toString() + File.separator + PROGRESS_LOG_FILE_NAME));
+ String log = reader.readLine();
+ while (log != null) {
+ if (log.startsWith(END_PREFIX)) {
+ completed.add(getSplitId(log));
+ }
+ log = reader.readLine();
+ }
+ reader.close();
+
+ progressLogger = Files.newBufferedWriter(
+ Paths.get(dir.toAbsolutePath().toString() + File.separator + PROGRESS_LOG_FILE_NAME),
+ StandardCharsets.UTF_8, StandardOpenOption.APPEND);
+ errorLogger = Files.newBufferedWriter(
+ Paths.get(dir.toAbsolutePath().toString() + File.separator + ERROR_LOG_FILE_NAME),
+ StandardCharsets.UTF_8, StandardOpenOption.APPEND);
+ recordLogger = Files.newBufferedWriter(
+ Paths.get(dir.toAbsolutePath().toString() + File.separator + BAD_RECORDS_FILE_NAME),
+ StandardCharsets.UTF_8, StandardOpenOption.APPEND);
+ }
+
+ public void close() throws IOException {
+ progressLogger.close();
+ errorLogger.close();
+ recordLogger.close();
+ }
+
+ public boolean create() throws IOException {
+ File f = dir.toFile();
+ f.mkdirs();
+ new File(f, PROGRESS_LOG_FILE_NAME).createNewFile();
+ new File(f, ERROR_LOG_FILE_NAME).createNewFile();
+ new File(f, BAD_RECORDS_FILE_NAME).createNewFile();
+ return true;
+ }
+
+ public boolean destroy() throws IOException {
+ File f = dir.toFile();
+ FileUtils.deleteDirectory(f);
+ return true;
+ }
+
+ public void logProgress(String log) throws IOException {
+ progressLogger.write(log);
+ progressLogger.newLine();
+ }
+
+ public void logError(String error, Throwable th) throws IOException {
+ errorLogger.append(error);
+ errorLogger.newLine();
+ errorLogger.append(th.toString());
+ errorLogger.newLine();
+ }
+
+ public void logRecord(String record, Exception e) throws IOException {
+ recordLogger.append(record);
+ recordLogger.newLine();
+ recordLogger.append(e.toString());
+ recordLogger.newLine();
+ }
+
+ public static String getSplitId(String log) {
+ return log.substring(PREFIX_SIZE);
+ }
+
+ public boolean isSplitRead(String split) {
+ return completed.contains(split);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java b/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
new file mode 100644
index 0000000..224ee31
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.util;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.asterix.common.cluster.ClusterPartition;
+import org.apache.asterix.common.utils.StoragePathUtil;
+import org.apache.asterix.om.util.AsterixClusterProperties;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint.PartitionConstraintType;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.io.IIOManager;
+import org.apache.hyracks.dataflow.std.file.FileSplit;
+
+public class FeedUtils {
+ private static String prepareDataverseFeedName(String dataverseName, String feedName) {
+ return dataverseName + File.separator + feedName;
+ }
+
+ public static FileSplit[] splitsForAdapter(String dataverseName, String feedName,
+ AlgebricksPartitionConstraint partitionConstraints) throws Exception {
+ File relPathFile = new File(prepareDataverseFeedName(dataverseName, feedName));
+ if (partitionConstraints.getPartitionConstraintType() == PartitionConstraintType.COUNT) {
+ throw new AlgebricksException("Can't create file splits for adapter with count partitioning constraints");
+ }
+ String[] locations = ((AlgebricksAbsolutePartitionConstraint) partitionConstraints).getLocations();
+ List<FileSplit> splits = new ArrayList<FileSplit>();
+ String storageDirName = AsterixClusterProperties.INSTANCE.getStorageDirectoryName();
+ int i = 0;
+ for (String nd : locations) {
+ // Always get the first partition
+ ClusterPartition nodePartition = AsterixClusterProperties.INSTANCE.getNodePartitions(nd)[0];
+ String storagePartitionPath = StoragePathUtil.prepareStoragePartitionPath(storageDirName,
+ nodePartition.getPartitionId());
+ // format: 'storage dir name'/partition_#/dataverse/feed/adapter_#
+ File f = new File(storagePartitionPath + File.separator + relPathFile + File.separator
+ + StoragePathUtil.ADAPTER_INSTANCE_PREFIX + i);
+ splits.add(StoragePathUtil.getFileSplitForClusterPartition(nodePartition, f));
+ i++;
+ }
+ return splits.toArray(new FileSplit[] {});
+ }
+
+ public static FileReference getAbsoluteFileRef(String relativePath, int ioDeviceId, IIOManager ioManager) {
+ return ioManager.getAbsoluteFileRef(ioDeviceId, relativePath);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/util/FileSystemWatcher.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/util/FileSystemWatcher.java b/asterix-external-data/src/main/java/org/apache/asterix/external/util/FileSystemWatcher.java
new file mode 100644
index 0000000..4bb9d92
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/util/FileSystemWatcher.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.util;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.ClosedWatchServiceException;
+import java.nio.file.FileSystems;
+import java.nio.file.Files;
+import java.nio.file.LinkOption;
+import java.nio.file.Path;
+import java.nio.file.StandardWatchEventKinds;
+import java.nio.file.WatchEvent;
+import java.nio.file.WatchEvent.Kind;
+import java.nio.file.WatchKey;
+import java.nio.file.WatchService;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+
+public class FileSystemWatcher {
+
+ private static Logger LOGGER = Logger.getLogger(FileSystemWatcher.class.getName());
+ private final WatchService watcher;
+ private final HashMap<WatchKey, Path> keys;
+ private final LinkedList<File> files = new LinkedList<File>();
+ private Iterator<File> it;
+ private final String expression;
+ private final FeedLogManager logManager;
+ private final Path path;
+ private final boolean isFeed;
+ private boolean done;
+ private File current;
+
+ public FileSystemWatcher(FeedLogManager logManager, Path inputResource, String expression, boolean isFeed)
+ throws IOException {
+ this.watcher = isFeed ? FileSystems.getDefault().newWatchService() : null;
+ this.keys = isFeed ? new HashMap<WatchKey, Path>() : null;
+ this.logManager = logManager;
+ this.expression = expression;
+ this.path = inputResource;
+ this.isFeed = isFeed;
+ }
+
+ public void init() throws IOException {
+ LinkedList<Path> dirs = null;
+ dirs = new LinkedList<Path>();
+ LocalFileSystemUtils.traverse(files, path.toFile(), expression, dirs);
+ it = files.iterator();
+ if (isFeed) {
+ for (Path path : dirs) {
+ register(path);
+ }
+ resume();
+ }
+ }
+
+ /**
+ * Register the given directory, and all its sub-directories, with the
+ * WatchService.
+ */
+ private void register(Path dir) throws IOException {
+ WatchKey key = dir.register(watcher, StandardWatchEventKinds.ENTRY_CREATE, StandardWatchEventKinds.ENTRY_DELETE,
+ StandardWatchEventKinds.ENTRY_MODIFY);
+ keys.put(key, dir);
+ }
+
+ private void resume() throws IOException {
+ if (logManager == null) {
+ return;
+ }
+ if (logManager.exists()) {
+ logManager.open();
+ } else {
+ logManager.create();
+ logManager.open();
+ return;
+ }
+ /*
+ * Done processing the progress log file. We now have:
+ * the files that were completed.
+ */
+
+ if (it == null) {
+ return;
+ }
+ while (it.hasNext()) {
+ File file = it.next();
+ if (logManager.isSplitRead(file.getAbsolutePath())) {
+ // File was read completely, remove it from the files list
+ it.remove();
+ }
+ }
+ // reset the iterator
+ it = files.iterator();
+ }
+
+ @SuppressWarnings("unchecked")
+ static <T> WatchEvent<T> cast(WatchEvent<?> event) {
+ return (WatchEvent<T>) event;
+ }
+
+ private void handleEvents(WatchKey key) {
+ // get dir associated with the key
+ Path dir = keys.get(key);
+ if (dir == null) {
+ // This should never happen
+ if (LOGGER.isEnabledFor(Level.WARN)) {
+ LOGGER.warn("WatchKey not recognized!!");
+ }
+ return;
+ }
+ for (WatchEvent<?> event : key.pollEvents()) {
+ Kind<?> kind = event.kind();
+ // TODO: Do something about overflow events
+ // An overflow event means that some events were dropped
+ if (kind == StandardWatchEventKinds.OVERFLOW) {
+ if (LOGGER.isEnabledFor(Level.WARN)) {
+ LOGGER.warn("Overflow event. Some events might have been missed");
+ }
+ continue;
+ }
+
+ // Context for directory entry event is the file name of entry
+ WatchEvent<Path> ev = cast(event);
+ Path name = ev.context();
+ Path child = dir.resolve(name);
+ // if directory is created then register it and its sub-directories
+ if ((kind == StandardWatchEventKinds.ENTRY_CREATE)) {
+ try {
+ if (Files.isDirectory(child, LinkOption.NOFOLLOW_LINKS)) {
+ register(child);
+ } else {
+ // it is a file, add it to the files list.
+ LocalFileSystemUtils.validateAndAdd(child, expression, files);
+ }
+ } catch (IOException e) {
+ if (LOGGER.isEnabledFor(Level.ERROR)) {
+ LOGGER.error(e);
+ }
+ }
+ }
+ }
+ }
+
+ public void close() throws IOException {
+ if (!done) {
+ if (watcher != null) {
+ watcher.close();
+ }
+ if (logManager != null) {
+ if (current != null) {
+ logManager.startPartition(current.getAbsolutePath());
+ logManager.endPartition();
+ }
+ logManager.close();
+ current = null;
+ }
+ done = true;
+ }
+ }
+
+ public File next() throws IOException {
+ if (current != null && logManager != null) {
+ logManager.startPartition(current.getAbsolutePath());
+ logManager.endPartition();
+ }
+ current = it.next();
+ return current;
+ }
+
+ private boolean endOfEvents(WatchKey key) {
+ // reset key and remove from set if directory no longer accessible
+ if (!key.reset()) {
+ keys.remove(key);
+ if (keys.isEmpty()) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public boolean hasNext() {
+ if (it.hasNext()) {
+ return true;
+ }
+ if (done || !isFeed) {
+ return false;
+ }
+ files.clear();
+ // Read new Events (Polling first to add all available files)
+ WatchKey key;
+ key = watcher.poll();
+ while (key != null) {
+ handleEvents(key);
+ if (endOfEvents(key)) {
+ return false;
+ }
+ key = watcher.poll();
+ }
+ // No file was found, wait for the filesystem to push events
+ while (files.isEmpty()) {
+ try {
+ key = watcher.take();
+ } catch (InterruptedException x) {
+ if (LOGGER.isEnabledFor(Level.WARN)) {
+ LOGGER.warn("Feed Closed");
+ }
+ return false;
+ } catch (ClosedWatchServiceException e) {
+ if (LOGGER.isEnabledFor(Level.WARN)) {
+ LOGGER.warn("The watcher has exited");
+ }
+ return false;
+ }
+ handleEvents(key);
+ if (endOfEvents(key)) {
+ return false;
+ }
+ }
+ // files were found, re-create the iterator and move it one step
+ it = files.iterator();
+ return it.hasNext();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/util/LocalFileSystemUtils.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/util/LocalFileSystemUtils.java b/asterix-external-data/src/main/java/org/apache/asterix/external/util/LocalFileSystemUtils.java
new file mode 100644
index 0000000..d6e9463
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/util/LocalFileSystemUtils.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.util;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.FileVisitResult;
+import java.nio.file.Files;
+import java.nio.file.LinkOption;
+import java.nio.file.Path;
+import java.nio.file.SimpleFileVisitor;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.util.LinkedList;
+import java.util.regex.Pattern;
+
+public class LocalFileSystemUtils {
+
+ //TODO: replace this method by FileUtils.iterateFilesAndDirs(.)
+ public static void traverse(final LinkedList<File> files, File root, final String expression,
+ final LinkedList<Path> dirs) throws IOException {
+ if (!Files.exists(root.toPath())) {
+ return;
+ }
+ if (!Files.isDirectory(root.toPath())) {
+ validateAndAdd(root.toPath(), expression, files);
+ }
+ //FileUtils.iterateFilesAndDirs(directory, fileFilter, dirFilter)
+ Files.walkFileTree(root.toPath(), new SimpleFileVisitor<Path>() {
+ @Override
+ public FileVisitResult preVisitDirectory(Path path, BasicFileAttributes attrs) throws IOException {
+ if (!Files.exists(path, LinkOption.NOFOLLOW_LINKS)) {
+ return FileVisitResult.TERMINATE;
+ }
+ if (Files.isDirectory(path, LinkOption.NOFOLLOW_LINKS)) {
+ if (dirs != null) {
+ dirs.add(path);
+ }
+ //get immediate children files
+ File[] content = path.toFile().listFiles();
+ for (File file : content) {
+ if (!file.isDirectory()) {
+ validateAndAdd(file.toPath(), expression, files);
+ }
+ }
+ } else {
+ // Path is a file, add to list of files if it matches the expression
+ validateAndAdd(path, expression, files);
+ }
+ return FileVisitResult.CONTINUE;
+ }
+ });
+ }
+
+ public static void validateAndAdd(Path path, String expression, LinkedList<File> files) {
+ if (expression == null || Pattern.matches(expression, path.toString())) {
+ files.add(new File(path.toString()));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/util/NodeResolverFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/util/NodeResolverFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/util/NodeResolverFactory.java
new file mode 100644
index 0000000..b62dda8
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/util/NodeResolverFactory.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.util;
+
+import org.apache.asterix.external.api.INodeResolver;
+import org.apache.asterix.external.api.INodeResolverFactory;
+
+/**
+ * Factory for creating instance of {@link NodeResolver}
+ */
+public class NodeResolverFactory implements INodeResolverFactory {
+
+ private static final INodeResolver INSTANCE = new NodeResolver();
+
+ @Override
+ public INodeResolver createNodeResolver() {
+ return INSTANCE;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/util/TweetGenerator.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/util/TweetGenerator.java b/asterix-external-data/src/main/java/org/apache/asterix/external/util/TweetGenerator.java
new file mode 100644
index 0000000..ec866be
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/util/TweetGenerator.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.util;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.external.util.DataGenerator.InitializationInfo;
+import org.apache.asterix.external.util.DataGenerator.TweetMessage;
+import org.apache.asterix.external.util.DataGenerator.TweetMessageIterator;
+
+public class TweetGenerator {
+
+ private static Logger LOGGER = Logger.getLogger(TweetGenerator.class.getName());
+
+ public static final String KEY_DURATION = "duration";
+ public static final String KEY_TPS = "tps";
+ public static final String KEY_VERBOSE = "verbose";
+ public static final String KEY_FIELDS = "fields";
+ public static final int INFINITY = 0;
+
+ private static final int DEFAULT_DURATION = INFINITY;
+
+ private int duration;
+ private TweetMessageIterator tweetIterator = null;
+ private int partition;
+ private long tweetCount = 0;
+ private int frameTweetCount = 0;
+ private int numFlushedTweets = 0;
+ private DataGenerator dataGenerator = null;
+ private ByteBuffer outputBuffer = ByteBuffer.allocate(32 * 1024);
+ private String[] fields;
+ private final List<OutputStream> subscribers;
+ private final Object lock = new Object();
+ private final List<OutputStream> subscribersForRemoval = new ArrayList<OutputStream>();
+
+ public TweetGenerator(Map<String, String> configuration, int partition) throws Exception {
+ this.partition = partition;
+ String value = configuration.get(KEY_DURATION);
+ this.duration = value != null ? Integer.parseInt(value) : DEFAULT_DURATION;
+ dataGenerator = new DataGenerator(new InitializationInfo());
+ tweetIterator = dataGenerator.new TweetMessageIterator(duration);
+ this.fields = configuration.get(KEY_FIELDS) != null ? configuration.get(KEY_FIELDS).split(",") : null;
+ this.subscribers = new ArrayList<OutputStream>();
+ }
+
+ private void writeTweetString(TweetMessage tweetMessage) throws IOException {
+ String tweet = tweetMessage.getAdmEquivalent(fields) + "\n";
+ System.out.println(tweet);
+ tweetCount++;
+ byte[] b = tweet.getBytes();
+ if (outputBuffer.position() + b.length > outputBuffer.limit()) {
+ flush();
+ numFlushedTweets += frameTweetCount;
+ frameTweetCount = 0;
+ outputBuffer.put(b);
+ } else {
+ outputBuffer.put(b);
+ }
+ frameTweetCount++;
+ }
+
+ private void flush() throws IOException {
+ outputBuffer.flip();
+ synchronized (lock) {
+ for (OutputStream os : subscribers) {
+ try {
+ os.write(outputBuffer.array(), 0, outputBuffer.limit());
+ } catch (Exception e) {
+ subscribersForRemoval.add(os);
+ }
+ }
+ if (!subscribersForRemoval.isEmpty()) {
+ subscribers.removeAll(subscribersForRemoval);
+ subscribersForRemoval.clear();
+ }
+ }
+ outputBuffer.position(0);
+ outputBuffer.limit(32 * 1024);
+ }
+
+ public boolean generateNextBatch(int numTweets) throws Exception {
+ boolean moreData = tweetIterator.hasNext();
+ if (!moreData) {
+ if (outputBuffer.position() > 0) {
+ flush();
+ }
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Reached end of batch. Tweet Count: [" + partition + "]" + tweetCount);
+ }
+ return false;
+ } else {
+ int count = 0;
+ while (count < numTweets) {
+ writeTweetString(tweetIterator.next());
+ count++;
+ }
+ return true;
+ }
+ }
+
+ public int getNumFlushedTweets() {
+ return numFlushedTweets;
+ }
+
+ public void registerSubscriber(OutputStream os) {
+ synchronized (lock) {
+ subscribers.add(os);
+ }
+ }
+
+ public void deregisterSubscribers(OutputStream os) {
+ synchronized (lock) {
+ subscribers.remove(os);
+ }
+ }
+
+ public void close() throws IOException {
+ synchronized (lock) {
+ for (OutputStream os : subscribers) {
+ os.close();
+ }
+ }
+ }
+
+ public boolean isSubscribed() {
+ return !subscribers.isEmpty();
+ }
+
+ public long getTweetCount() {
+ return tweetCount;
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/util/TweetProcessor.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/util/TweetProcessor.java b/asterix-external-data/src/main/java/org/apache/asterix/external/util/TweetProcessor.java
deleted file mode 100644
index f8914a6..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/util/TweetProcessor.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.util;
-
-import org.apache.asterix.external.library.java.JObjectUtil;
-import org.apache.asterix.external.util.Datatypes.Tweet;
-import org.apache.asterix.om.base.AMutableDouble;
-import org.apache.asterix.om.base.AMutableInt32;
-import org.apache.asterix.om.base.AMutableRecord;
-import org.apache.asterix.om.base.AMutableString;
-import org.apache.asterix.om.base.IAObject;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.asterix.om.types.ATypeTag;
-import org.apache.asterix.om.types.IAType;
-import twitter4j.Status;
-import twitter4j.User;
-
-import java.util.HashMap;
-import java.util.Map;
-
-public class TweetProcessor {
-
- private IAObject[] mutableTweetFields;
- private IAObject[] mutableUserFields;
- private AMutableRecord mutableRecord;
- private AMutableRecord mutableUser;
-
- private final Map<String, Integer> userFieldNameMap = new HashMap<>();
- private final Map<String, Integer> tweetFieldNameMap = new HashMap<>();
-
-
- public TweetProcessor(ARecordType recordType) {
- initFieldNames(recordType);
- mutableUserFields = new IAObject[] { new AMutableString(null), new AMutableString(null), new AMutableInt32(0),
- new AMutableInt32(0), new AMutableString(null), new AMutableInt32(0) };
- mutableUser = new AMutableRecord((ARecordType) recordType.getFieldTypes()[tweetFieldNameMap.get(Tweet.USER)], mutableUserFields);
-
- mutableTweetFields = new IAObject[] { new AMutableString(null), mutableUser, new AMutableDouble(0),
- new AMutableDouble(0), new AMutableString(null), new AMutableString(null) };
- mutableRecord = new AMutableRecord(recordType, mutableTweetFields);
-
- }
-
- // Initialize the hashmap values for the field names and positions
- private void initFieldNames(ARecordType recordType) {
- String tweetFields[] = recordType.getFieldNames();
- for (int i=0; i<tweetFields.length; i++) {
- tweetFieldNameMap.put(tweetFields[i], i);
- if (tweetFields[i].equals(Tweet.USER)) {
- IAType fieldType = recordType.getFieldTypes()[i];
- if (fieldType.getTypeTag() == ATypeTag.RECORD) {
- String userFields[] = ((ARecordType)fieldType).getFieldNames();
- for (int j=0; j<userFields.length; j++) {
- userFieldNameMap.put(userFields[j], j);
- }
- }
-
- }
- }
- }
-
-
- public AMutableRecord processNextTweet(Status tweet) {
- User user = tweet.getUser();
-
- // Tweet user data
- ((AMutableString) mutableUserFields[userFieldNameMap.get(Tweet.SCREEN_NAME)]).setValue(JObjectUtil.getNormalizedString(user.getScreenName()));
- ((AMutableString) mutableUserFields[userFieldNameMap.get(Tweet.LANGUAGE)]).setValue(JObjectUtil.getNormalizedString(user.getLang()));
- ((AMutableInt32) mutableUserFields[userFieldNameMap.get(Tweet.FRIENDS_COUNT)]).setValue(user.getFriendsCount());
- ((AMutableInt32) mutableUserFields[userFieldNameMap.get(Tweet.STATUS_COUNT)]).setValue(user.getStatusesCount());
- ((AMutableString) mutableUserFields[userFieldNameMap.get(Tweet.NAME)]).setValue(JObjectUtil.getNormalizedString(user.getName()));
- ((AMutableInt32) mutableUserFields[userFieldNameMap.get(Tweet.FOLLOWERS_COUNT)]).setValue(user.getFollowersCount());
-
-
- // Tweet data
- ((AMutableString) mutableTweetFields[tweetFieldNameMap.get(Tweet.ID)]).setValue(String.valueOf(tweet.getId()));
-
- int userPos = tweetFieldNameMap.get(Tweet.USER);
- for (int i = 0; i < mutableUserFields.length; i++) {
- ((AMutableRecord) mutableTweetFields[userPos]).setValueAtPos(i, mutableUserFields[i]);
- }
- if (tweet.getGeoLocation() != null) {
- ((AMutableDouble) mutableTweetFields[tweetFieldNameMap.get(Tweet.LATITUDE)]).setValue(tweet.getGeoLocation().getLatitude());
- ((AMutableDouble) mutableTweetFields[tweetFieldNameMap.get(Tweet.LONGITUDE)]).setValue(tweet.getGeoLocation().getLongitude());
- } else {
- ((AMutableDouble) mutableTweetFields[tweetFieldNameMap.get(Tweet.LATITUDE)]).setValue(0);
- ((AMutableDouble) mutableTweetFields[tweetFieldNameMap.get(Tweet.LONGITUDE)]).setValue(0);
- }
- ((AMutableString) mutableTweetFields[tweetFieldNameMap.get(Tweet.CREATED_AT)]).setValue(JObjectUtil.getNormalizedString(
- tweet.getCreatedAt().toString()));
- ((AMutableString) mutableTweetFields[tweetFieldNameMap.get(Tweet.MESSAGE)]).setValue(JObjectUtil.getNormalizedString(tweet.getText()));
-
- for (int i = 0; i < mutableTweetFields.length; i++) {
- mutableRecord.setValueAtPos(i, mutableTweetFields[i]);
- }
-
- return mutableRecord;
-
- }
-
- public AMutableRecord getMutableRecord() {
- return mutableRecord;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/test/java/org/apache/asterix/external/library/UpperCaseFunction.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/test/java/org/apache/asterix/external/library/UpperCaseFunction.java b/asterix-external-data/src/test/java/org/apache/asterix/external/library/UpperCaseFunction.java
index 70bd3e1..16f8b0a 100644
--- a/asterix-external-data/src/test/java/org/apache/asterix/external/library/UpperCaseFunction.java
+++ b/asterix-external-data/src/test/java/org/apache/asterix/external/library/UpperCaseFunction.java
@@ -18,14 +18,11 @@
*/
package org.apache.asterix.external.library;
-import java.util.Random;
-
+import org.apache.asterix.external.api.IExternalScalarFunction;
+import org.apache.asterix.external.api.IFunctionHelper;
import org.apache.asterix.external.library.java.JObjects.JInt;
import org.apache.asterix.external.library.java.JObjects.JRecord;
import org.apache.asterix.external.library.java.JObjects.JString;
-import org.apache.asterix.external.api.IExternalScalarFunction;
-import org.apache.asterix.external.api.IFunctionHelper;
-import org.apache.asterix.external.library.java.JTypeTag;
/**
* Accepts an input record of type Open{ id: int32, text: string }
@@ -35,11 +32,8 @@ import org.apache.asterix.external.library.java.JTypeTag;
*/
public class UpperCaseFunction implements IExternalScalarFunction {
- private Random random;
-
@Override
public void initialize(IFunctionHelper functionHelper) {
- random = new Random();
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapter.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapter.java b/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapter.java
index df0fb94..2fc289b 100644
--- a/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapter.java
+++ b/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapter.java
@@ -19,21 +19,24 @@
package org.apache.asterix.external.library.adapter;
import java.io.IOException;
-import java.io.InputStream;
import java.io.OutputStream;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.logging.Level;
+import java.util.logging.Logger;
-import org.apache.asterix.external.dataset.adapter.StreamBasedAdapter;
+import org.apache.asterix.external.api.IFeedAdapter;
import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.IAType;
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.dataflow.std.file.ITupleParser;
import org.apache.hyracks.dataflow.std.file.ITupleParserFactory;
-public class TestTypedAdapter extends StreamBasedAdapter {
+public class TestTypedAdapter implements IFeedAdapter {
private static final long serialVersionUID = 1L;
@@ -45,25 +48,34 @@ public class TestTypedAdapter extends StreamBasedAdapter {
private DummyGenerator generator;
+ protected final ITupleParser tupleParser;
+
+ protected final IAType sourceDatatype;
+
+ protected static final Logger LOGGER = Logger.getLogger(TestTypedAdapter.class.getName());
+
public TestTypedAdapter(ITupleParserFactory parserFactory, ARecordType sourceDatatype, IHyracksTaskContext ctx,
Map<String, String> configuration, int partition) throws IOException {
- super(parserFactory, sourceDatatype, ctx, partition);
pos = new PipedOutputStream();
pis = new PipedInputStream(pos);
this.configuration = configuration;
+ this.tupleParser = parserFactory.createTupleParser(ctx);
+ this.sourceDatatype = sourceDatatype;
}
@Override
- public InputStream getInputStream(int partition) throws IOException {
- return pis;
- }
-
- @Override
- public void start(int partition, IFrameWriter frameWriter) throws Exception {
+ public void start(int partition, IFrameWriter writer) throws Exception {
generator = new DummyGenerator(configuration, pos);
ExecutorService executor = Executors.newSingleThreadExecutor();
executor.execute(generator);
- super.start(partition, frameWriter);
+ if (pis != null) {
+ tupleParser.parse(pis, writer);
+ } else {
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning(
+ "Could not obtain input stream for parsing from adapter " + this + "[" + partition + "]");
+ }
+ }
}
private static class DummyGenerator implements Runnable {
@@ -135,4 +147,13 @@ public class TestTypedAdapter extends StreamBasedAdapter {
return false;
}
+ @Override
+ public boolean pause() {
+ return false;
+ }
+
+ @Override
+ public boolean resume() {
+ return false;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java b/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
index 6b08f3a..5346bf2 100644
--- a/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
+++ b/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
@@ -22,9 +22,9 @@ import java.io.InputStream;
import java.util.Map;
import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.common.feeds.api.IDataSourceAdapter;
import org.apache.asterix.common.parse.ITupleForwarder;
import org.apache.asterix.external.api.IAdapterFactory;
+import org.apache.asterix.external.api.IDataSourceAdapter;
import org.apache.asterix.external.parser.ADMDataParser;
import org.apache.asterix.external.util.DataflowUtils;
import org.apache.asterix.om.types.ARecordType;
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/statement/SubscribeFeedStatement.java
----------------------------------------------------------------------
diff --git a/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/statement/SubscribeFeedStatement.java b/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/statement/SubscribeFeedStatement.java
index 71c762a..3f85ba9 100644
--- a/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/statement/SubscribeFeedStatement.java
+++ b/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/statement/SubscribeFeedStatement.java
@@ -24,12 +24,13 @@ import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.common.feeds.FeedActivity;
-import org.apache.asterix.common.feeds.FeedConnectionRequest;
-import org.apache.asterix.common.feeds.FeedId;
-import org.apache.asterix.common.feeds.FeedPolicyAccessor;
import org.apache.asterix.common.functions.FunctionSignature;
import org.apache.asterix.external.api.IAdapterFactory;
+import org.apache.asterix.external.api.IDataSourceAdapter;
+import org.apache.asterix.external.feed.management.FeedConnectionRequest;
+import org.apache.asterix.external.feed.management.FeedId;
+import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
+import org.apache.asterix.external.feed.watch.FeedActivity;
import org.apache.asterix.lang.aql.parser.AQLParserFactory;
import org.apache.asterix.lang.common.base.IParser;
import org.apache.asterix.lang.common.base.IParserFactory;
@@ -41,12 +42,9 @@ import org.apache.asterix.lang.common.visitor.base.ILangVisitor;
import org.apache.asterix.metadata.MetadataException;
import org.apache.asterix.metadata.MetadataManager;
import org.apache.asterix.metadata.MetadataTransactionContext;
-import org.apache.asterix.metadata.entities.DatasourceAdapter.AdapterType;
import org.apache.asterix.metadata.entities.Feed;
import org.apache.asterix.metadata.entities.Function;
-import org.apache.asterix.metadata.entities.PrimaryFeed;
-import org.apache.asterix.metadata.entities.SecondaryFeed;
-import org.apache.asterix.metadata.feeds.FeedUtil;
+import org.apache.asterix.metadata.feeds.FeedMetadataUtil;
import org.apache.asterix.om.types.ARecordType;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.common.utils.Triple;
@@ -188,14 +186,13 @@ public class SubscribeFeedStatement implements Statement {
try {
switch (feed.getFeedType()) {
case PRIMARY:
- Triple<IAdapterFactory, ARecordType, AdapterType> factoryOutput = null;
+ Triple<IAdapterFactory, ARecordType, IDataSourceAdapter.AdapterType> factoryOutput = null;
- factoryOutput = FeedUtil.getPrimaryFeedFactoryAndOutput((PrimaryFeed) feed, policyAccessor,
- mdTxnCtx);
+ factoryOutput = FeedMetadataUtil.getPrimaryFeedFactoryAndOutput(feed, policyAccessor, mdTxnCtx);
outputType = factoryOutput.second.getTypeName();
break;
case SECONDARY:
- outputType = FeedUtil.getSecondaryFeedOutput((SecondaryFeed) feed, policyAccessor, mdTxnCtx);
+ outputType = FeedMetadataUtil.getSecondaryFeedOutput(feed, policyAccessor, mdTxnCtx);
break;
}
return outputType;
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-lang-aql/src/main/javacc/AQL.jj
----------------------------------------------------------------------
diff --git a/asterix-lang-aql/src/main/javacc/AQL.jj b/asterix-lang-aql/src/main/javacc/AQL.jj
index 12e7897..8f62f74 100644
--- a/asterix-lang-aql/src/main/javacc/AQL.jj
+++ b/asterix-lang-aql/src/main/javacc/AQL.jj
@@ -43,6 +43,7 @@ import org.apache.asterix.common.annotations.TypeDataGen;
import org.apache.asterix.common.annotations.UndeclaredFieldsDataGen;
import org.apache.asterix.common.config.DatasetConfig.DatasetType;
import org.apache.asterix.common.config.DatasetConfig.IndexType;
+import org.apache.asterix.common.config.MetadataConstants;
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.functions.FunctionSignature;
import org.apache.asterix.lang.aql.clause.DistinctClause;
@@ -127,7 +128,6 @@ import org.apache.asterix.lang.common.statement.WriteStatement;
import org.apache.asterix.lang.common.struct.Identifier;
import org.apache.asterix.lang.common.struct.QuantifiedPair;
import org.apache.asterix.lang.common.struct.VarIdentifier;
-import org.apache.asterix.metadata.bootstrap.MetadataConstants;
import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.algebricks.common.utils.Triple;
import org.apache.hyracks.algebricks.core.algebra.expressions.IExpressionAnnotation;
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/DatasetDecl.java
----------------------------------------------------------------------
diff --git a/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/DatasetDecl.java b/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/DatasetDecl.java
index 49aa74b..d9d09f7 100644
--- a/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/DatasetDecl.java
+++ b/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/DatasetDecl.java
@@ -20,12 +20,12 @@ package org.apache.asterix.lang.common.statement;
import java.util.Map;
+import org.apache.asterix.common.config.MetadataConstants;
import org.apache.asterix.common.config.DatasetConfig.DatasetType;
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.lang.common.base.Statement;
import org.apache.asterix.lang.common.struct.Identifier;
import org.apache.asterix.lang.common.visitor.base.ILangVisitor;
-import org.apache.asterix.metadata.bootstrap.MetadataConstants;
public class DatasetDecl implements Statement {
protected final Identifier name;
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/FormatPrintVisitor.java
----------------------------------------------------------------------
diff --git a/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/FormatPrintVisitor.java b/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/FormatPrintVisitor.java
index 28e5af0..a1e6363 100644
--- a/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/FormatPrintVisitor.java
+++ b/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/FormatPrintVisitor.java
@@ -27,6 +27,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import org.apache.asterix.common.config.MetadataConstants;
import org.apache.asterix.common.config.DatasetConfig.DatasetType;
import org.apache.asterix.common.config.DatasetConfig.IndexType;
import org.apache.asterix.common.exceptions.AsterixException;
@@ -95,7 +96,6 @@ import org.apache.asterix.lang.common.struct.Identifier;
import org.apache.asterix.lang.common.struct.OperatorType;
import org.apache.asterix.lang.common.struct.QuantifiedPair;
import org.apache.asterix.lang.common.visitor.base.ILangVisitor;
-import org.apache.asterix.metadata.bootstrap.MetadataConstants;
import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.algebricks.core.algebra.expressions.IExpressionAnnotation;
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/VariableCheckAndRewriteVisitor.java
----------------------------------------------------------------------
diff --git a/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/VariableCheckAndRewriteVisitor.java b/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/VariableCheckAndRewriteVisitor.java
index b016b62..f9cf99f 100644
--- a/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/VariableCheckAndRewriteVisitor.java
+++ b/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/VariableCheckAndRewriteVisitor.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
+import org.apache.asterix.common.config.MetadataConstants;
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.functions.FunctionSignature;
import org.apache.asterix.lang.common.base.Expression;
@@ -69,7 +70,6 @@ import org.apache.asterix.lang.sqlpp.expression.SelectExpression;
import org.apache.asterix.lang.sqlpp.struct.SetOperationRight;
import org.apache.asterix.lang.sqlpp.util.SqlppFormatPrintUtil;
import org.apache.asterix.lang.sqlpp.visitor.base.AbstractSqlppQueryExpressionVisitor;
-import org.apache.asterix.metadata.bootstrap.MetadataConstants;
import org.apache.hyracks.algebricks.core.algebra.base.Counter;
public class VariableCheckAndRewriteVisitor extends AbstractSqlppQueryExpressionVisitor<Expression, Void> {
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj
----------------------------------------------------------------------
diff --git a/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj b/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj
index 0e3fad8..547a10b 100644
--- a/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj
+++ b/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj
@@ -44,6 +44,7 @@ import org.apache.asterix.common.annotations.TypeDataGen;
import org.apache.asterix.common.annotations.UndeclaredFieldsDataGen;
import org.apache.asterix.common.config.DatasetConfig.DatasetType;
import org.apache.asterix.common.config.DatasetConfig.IndexType;
+import org.apache.asterix.common.config.MetadataConstants;
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.functions.FunctionSignature;
import org.apache.asterix.lang.common.base.Expression;
@@ -140,7 +141,6 @@ import org.apache.asterix.lang.sqlpp.optype.JoinType;
import org.apache.asterix.lang.sqlpp.optype.SetOpType;
import org.apache.asterix.lang.sqlpp.struct.SetOperationInput;
import org.apache.asterix.lang.sqlpp.struct.SetOperationRight;
-import org.apache.asterix.metadata.bootstrap.MetadataConstants;
import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.algebricks.common.utils.Triple;
import org.apache.hyracks.algebricks.core.algebra.expressions.IExpressionAnnotation;
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataCache.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataCache.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataCache.java
index ea65b20..a3a81de 100644
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataCache.java
+++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataCache.java
@@ -35,7 +35,7 @@ import org.apache.asterix.metadata.entities.DatasourceAdapter;
import org.apache.asterix.metadata.entities.Datatype;
import org.apache.asterix.metadata.entities.Dataverse;
import org.apache.asterix.metadata.entities.Feed;
-import org.apache.asterix.metadata.entities.FeedPolicy;
+import org.apache.asterix.metadata.entities.FeedPolicyEntity;
import org.apache.asterix.metadata.entities.Function;
import org.apache.asterix.metadata.entities.Index;
import org.apache.asterix.metadata.entities.InternalDatasetDetails;
@@ -66,9 +66,9 @@ public class MetadataCache {
protected final Map<FunctionSignature, Function> functions = new HashMap<FunctionSignature, Function>();
// Key is adapter dataverse. Key of value map is the adapter name
protected final Map<String, Map<String, DatasourceAdapter>> adapters = new HashMap<String, Map<String, DatasourceAdapter>>();
-
+
// Key is DataverseName, Key of the value map is the Policy name
- protected final Map<String, Map<String, FeedPolicy>> feedPolicies = new HashMap<String, Map<String, FeedPolicy>>();
+ protected final Map<String, Map<String, FeedPolicyEntity>> feedPolicies = new HashMap<String, Map<String, FeedPolicyEntity>>();
// Key is library dataverse. Key of value map is the library name
protected final Map<String, Map<String, Library>> libraries = new HashMap<String, Map<String, Library>>();
// Key is library dataverse. Key of value map is the feed name
@@ -110,18 +110,17 @@ public class MetadataCache {
synchronized (datatypes) {
synchronized (functions) {
synchronized (adapters) {
- synchronized (libraries) {
- synchronized (compactionPolicies) {
- dataverses.clear();
- nodeGroups.clear();
- datasets.clear();
- indexes.clear();
- datatypes.clear();
- functions.clear();
- adapters.clear();
- libraries.clear();
- compactionPolicies.clear();
- }
+ synchronized (libraries) {
+ synchronized (compactionPolicies) {
+ dataverses.clear();
+ nodeGroups.clear();
+ datasets.clear();
+ indexes.clear();
+ datatypes.clear();
+ functions.clear();
+ adapters.clear();
+ libraries.clear();
+ compactionPolicies.clear();
}
}
}
@@ -131,9 +130,9 @@ public class MetadataCache {
}
}
}
-
+ }
- public Object addDataverseIfNotExists(Dataverse dataverse) {
+ public Dataverse addDataverseIfNotExists(Dataverse dataverse) {
synchronized (dataverses) {
synchronized (datasets) {
synchronized (datatypes) {
@@ -149,7 +148,7 @@ public class MetadataCache {
}
}
- public Object addDatasetIfNotExists(Dataset dataset) {
+ public Dataset addDatasetIfNotExists(Dataset dataset) {
synchronized (datasets) {
synchronized (indexes) {
// Add the primary index associated with the dataset, if the dataset is an
@@ -175,13 +174,13 @@ public class MetadataCache {
}
}
- public Object addIndexIfNotExists(Index index) {
+ public Index addIndexIfNotExists(Index index) {
synchronized (indexes) {
return addIndexIfNotExistsInternal(index);
}
}
- public Object addDatatypeIfNotExists(Datatype datatype) {
+ public Datatype addDatatypeIfNotExists(Datatype datatype) {
synchronized (datatypes) {
Map<String, Datatype> m = datatypes.get(datatype.getDataverseName());
if (m == null) {
@@ -195,7 +194,7 @@ public class MetadataCache {
}
}
- public Object addNodeGroupIfNotExists(NodeGroup nodeGroup) {
+ public NodeGroup addNodeGroupIfNotExists(NodeGroup nodeGroup) {
synchronized (nodeGroups) {
if (!nodeGroups.containsKey(nodeGroup.getNodeGroupName())) {
return nodeGroups.put(nodeGroup.getNodeGroupName(), nodeGroup);
@@ -204,7 +203,7 @@ public class MetadataCache {
}
}
- public Object addCompactionPolicyIfNotExists(CompactionPolicy compactionPolicy) {
+ public CompactionPolicy addCompactionPolicyIfNotExists(CompactionPolicy compactionPolicy) {
synchronized (compactionPolicy) {
Map<String, CompactionPolicy> p = compactionPolicies.get(compactionPolicy.getDataverseName());
if (p == null) {
@@ -220,17 +219,17 @@ public class MetadataCache {
}
}
- public Object dropCompactionPolicy(CompactionPolicy compactionPolicy) {
+ public CompactionPolicy dropCompactionPolicy(CompactionPolicy compactionPolicy) {
synchronized (compactionPolicies) {
Map<String, CompactionPolicy> p = compactionPolicies.get(compactionPolicy.getDataverseName());
if (p != null && p.get(compactionPolicy.getPolicyName()) != null) {
- return p.remove(compactionPolicy).getPolicyName();
+ return p.remove(compactionPolicy);
}
return null;
}
}
- public Object dropDataverse(Dataverse dataverse) {
+ public Dataverse dropDataverse(Dataverse dataverse) {
synchronized (dataverses) {
synchronized (datasets) {
synchronized (indexes) {
@@ -238,26 +237,25 @@ public class MetadataCache {
synchronized (functions) {
synchronized (adapters) {
synchronized (libraries) {
- synchronized (feeds) {
- synchronized (compactionPolicies) {
- datasets.remove(dataverse.getDataverseName());
- indexes.remove(dataverse.getDataverseName());
- datatypes.remove(dataverse.getDataverseName());
- adapters.remove(dataverse.getDataverseName());
- compactionPolicies.remove(dataverse.getDataverseName());
- List<FunctionSignature> markedFunctionsForRemoval = new ArrayList<FunctionSignature>();
- for (FunctionSignature signature : functions.keySet()) {
- if (signature.getNamespace().equals(dataverse.getDataverseName())) {
- markedFunctionsForRemoval.add(signature);
- }
- }
- for (FunctionSignature signature : markedFunctionsForRemoval) {
- functions.remove(signature);
+ synchronized (feeds) {
+ synchronized (compactionPolicies) {
+ datasets.remove(dataverse.getDataverseName());
+ indexes.remove(dataverse.getDataverseName());
+ datatypes.remove(dataverse.getDataverseName());
+ adapters.remove(dataverse.getDataverseName());
+ compactionPolicies.remove(dataverse.getDataverseName());
+ List<FunctionSignature> markedFunctionsForRemoval = new ArrayList<FunctionSignature>();
+ for (FunctionSignature signature : functions.keySet()) {
+ if (signature.getNamespace().equals(dataverse.getDataverseName())) {
+ markedFunctionsForRemoval.add(signature);
}
- libraries.remove(dataverse.getDataverseName());
- feeds.remove(dataverse.getDataverseName());
- return dataverses.remove(dataverse.getDataverseName());
}
+ for (FunctionSignature signature : markedFunctionsForRemoval) {
+ functions.remove(signature);
+ }
+ libraries.remove(dataverse.getDataverseName());
+ feeds.remove(dataverse.getDataverseName());
+ return dataverses.remove(dataverse.getDataverseName());
}
}
}
@@ -267,9 +265,9 @@ public class MetadataCache {
}
}
}
-
+ }
- public Object dropDataset(Dataset dataset) {
+ public Dataset dropDataset(Dataset dataset) {
synchronized (datasets) {
synchronized (indexes) {
@@ -289,7 +287,7 @@ public class MetadataCache {
}
}
- public Object dropIndex(Index index) {
+ public Index dropIndex(Index index) {
synchronized (indexes) {
Map<String, Map<String, Index>> datasetMap = indexes.get(index.getDataverseName());
if (datasetMap == null) {
@@ -304,7 +302,7 @@ public class MetadataCache {
}
}
- public Object dropDatatype(Datatype datatype) {
+ public Datatype dropDatatype(Datatype datatype) {
synchronized (datatypes) {
Map<String, Datatype> m = datatypes.get(datatype.getDataverseName());
if (m == null) {
@@ -314,7 +312,7 @@ public class MetadataCache {
}
}
- public Object dropNodeGroup(NodeGroup nodeGroup) {
+ public NodeGroup dropNodeGroup(NodeGroup nodeGroup) {
synchronized (nodeGroups) {
return nodeGroups.remove(nodeGroup.getNodeGroupName());
}
@@ -405,11 +403,11 @@ public class MetadataCache {
*/
protected class MetadataLogicalOperation {
// Entity to be added/dropped.
- public final IMetadataEntity entity;
+ public final IMetadataEntity<?> entity;
// True for add, false for drop.
public final boolean isAdd;
- public MetadataLogicalOperation(IMetadataEntity entity, boolean isAdd) {
+ public MetadataLogicalOperation(IMetadataEntity<?> entity, boolean isAdd) {
this.entity = entity;
this.isAdd = isAdd;
}
@@ -431,7 +429,7 @@ public class MetadataCache {
}
}
- public Object addFunctionIfNotExists(Function function) {
+ public Function addFunctionIfNotExists(Function function) {
synchronized (functions) {
FunctionSignature signature = new FunctionSignature(function.getDataverseName(), function.getName(),
function.getArity());
@@ -443,7 +441,7 @@ public class MetadataCache {
}
}
- public Object dropFunction(Function function) {
+ public Function dropFunction(Function function) {
synchronized (functions) {
FunctionSignature signature = new FunctionSignature(function.getDataverseName(), function.getName(),
function.getArity());
@@ -455,11 +453,11 @@ public class MetadataCache {
}
}
- public Object addFeedPolicyIfNotExists(FeedPolicy feedPolicy) {
+ public Object addFeedPolicyIfNotExists(FeedPolicyEntity feedPolicy) {
synchronized (feedPolicy) {
- Map<String, FeedPolicy> p = feedPolicies.get(feedPolicy.getDataverseName());
+ Map<String, FeedPolicyEntity> p = feedPolicies.get(feedPolicy.getDataverseName());
if (p == null) {
- p = new HashMap<String, FeedPolicy>();
+ p = new HashMap<String, FeedPolicyEntity>();
p.put(feedPolicy.getPolicyName(), feedPolicy);
feedPolicies.put(feedPolicy.getDataverseName(), p);
} else {
@@ -471,9 +469,9 @@ public class MetadataCache {
}
}
- public Object dropFeedPolicy(FeedPolicy feedPolicy) {
+ public Object dropFeedPolicy(FeedPolicyEntity feedPolicy) {
synchronized (feedPolicies) {
- Map<String, FeedPolicy> p = feedPolicies.get(feedPolicy.getDataverseName());
+ Map<String, FeedPolicyEntity> p = feedPolicies.get(feedPolicy.getDataverseName());
if (p != null && p.get(feedPolicy.getPolicyName()) != null) {
return p.remove(feedPolicy).getPolicyName();
}
@@ -481,10 +479,10 @@ public class MetadataCache {
}
}
- public Object addAdapterIfNotExists(DatasourceAdapter adapter) {
+ public DatasourceAdapter addAdapterIfNotExists(DatasourceAdapter adapter) {
synchronized (adapters) {
- Map<String, DatasourceAdapter> adaptersInDataverse = adapters.get(adapter.getAdapterIdentifier()
- .getNamespace());
+ Map<String, DatasourceAdapter> adaptersInDataverse = adapters
+ .get(adapter.getAdapterIdentifier().getNamespace());
if (adaptersInDataverse == null) {
adaptersInDataverse = new HashMap<String, DatasourceAdapter>();
adapters.put(adapter.getAdapterIdentifier().getNamespace(), adaptersInDataverse);
@@ -497,10 +495,10 @@ public class MetadataCache {
}
}
- public Object dropAdapter(DatasourceAdapter adapter) {
+ public DatasourceAdapter dropAdapter(DatasourceAdapter adapter) {
synchronized (adapters) {
- Map<String, DatasourceAdapter> adaptersInDataverse = adapters.get(adapter.getAdapterIdentifier()
- .getNamespace());
+ Map<String, DatasourceAdapter> adaptersInDataverse = adapters
+ .get(adapter.getAdapterIdentifier().getNamespace());
if (adaptersInDataverse != null) {
return adaptersInDataverse.remove(adapter.getAdapterIdentifier().getName());
}
@@ -508,10 +506,7 @@ public class MetadataCache {
}
}
-
-
-
- public Object addLibraryIfNotExists(Library library) {
+ public Library addLibraryIfNotExists(Library library) {
synchronized (libraries) {
Map<String, Library> libsInDataverse = libraries.get(library.getDataverseName());
boolean needToAddd = (libsInDataverse == null || libsInDataverse.get(library.getName()) != null);
@@ -526,7 +521,7 @@ public class MetadataCache {
}
}
- public Object dropLibrary(Library library) {
+ public Library dropLibrary(Library library) {
synchronized (libraries) {
Map<String, Library> librariesInDataverse = libraries.get(library.getDataverseName());
if (librariesInDataverse != null) {
@@ -536,12 +531,11 @@ public class MetadataCache {
}
}
- public Object addFeedIfNotExists(Feed feed) {
- // TODO Auto-generated method stub
+ public Feed addFeedIfNotExists(Feed feed) {
return null;
}
- public Object dropFeed(Feed feed) {
+ public Feed dropFeed(Feed feed) {
synchronized (feeds) {
Map<String, Feed> feedsInDataverse = feeds.get(feed.getDataverseName());
if (feedsInDataverse != null) {
@@ -551,7 +545,7 @@ public class MetadataCache {
}
}
- private Object addIndexIfNotExistsInternal(Index index) {
+ private Index addIndexIfNotExistsInternal(Index index) {
Map<String, Map<String, Index>> datasetMap = indexes.get(index.getDataverseName());
if (datasetMap == null) {
datasetMap = new HashMap<String, Map<String, Index>>();