You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by im...@apache.org on 2016/04/07 16:59:49 UTC
[14/50] [abbrv] incubator-asterixdb git commit: Merge branch 'master'
into hyracks-merge2
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8516517e/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FileSystemWatcher.java
----------------------------------------------------------------------
diff --cc asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FileSystemWatcher.java
index 4eec348,0000000..ea5cc8f
mode 100644,000000..100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FileSystemWatcher.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FileSystemWatcher.java
@@@ -1,276 -1,0 +1,283 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.util;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.ClosedWatchServiceException;
+import java.nio.file.FileSystems;
+import java.nio.file.Files;
+import java.nio.file.LinkOption;
+import java.nio.file.Path;
+import java.nio.file.StandardWatchEventKinds;
+import java.nio.file.WatchEvent;
+import java.nio.file.WatchEvent.Kind;
+import java.nio.file.WatchKey;
+import java.nio.file.WatchService;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
++import java.util.List;
++import java.util.concurrent.locks.ReentrantLock;
+
- import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+
+public class FileSystemWatcher {
+
+ private static final Logger LOGGER = Logger.getLogger(FileSystemWatcher.class.getName());
+ private WatchService watcher;
+ private final HashMap<WatchKey, Path> keys;
+ private final LinkedList<File> files = new LinkedList<File>();
+ private Iterator<File> it;
+ private final String expression;
+ private FeedLogManager logManager;
- private final Path path;
++ private final List<Path> paths;
+ private final boolean isFeed;
+ private boolean done;
- private File current;
- private AbstractFeedDataFlowController controller;
+ private final LinkedList<Path> dirs;
++ private final ReentrantLock lock = new ReentrantLock();
+
- public FileSystemWatcher(Path inputResource, String expression, boolean isFeed) {
++ public FileSystemWatcher(List<Path> inputResources, String expression, boolean isFeed) throws HyracksDataException {
++ this.isFeed = isFeed;
+ this.keys = isFeed ? new HashMap<WatchKey, Path>() : null;
+ this.expression = expression;
- this.path = inputResource;
- this.isFeed = isFeed;
++ this.paths = inputResources;
+ this.dirs = new LinkedList<Path>();
++ if (!isFeed) {
++ init();
++ }
+ }
+
- public void setFeedLogManager(FeedLogManager feedLogManager) {
- this.logManager = feedLogManager;
++ public synchronized void setFeedLogManager(FeedLogManager feedLogManager) throws HyracksDataException {
++ if (logManager == null) {
++ this.logManager = feedLogManager;
++ init();
++ }
+ }
+
- public void init() throws HyracksDataException {
++ public synchronized void init() throws HyracksDataException {
+ try {
+ dirs.clear();
- LocalFileSystemUtils.traverse(files, path.toFile(), expression, dirs);
- it = files.iterator();
- if (isFeed) {
- keys.clear();
- if (watcher != null) {
- try {
- watcher.close();
- } catch (IOException e) {
- LOGGER.warn("Failed to close watcher service", e);
++ for (Path path : paths) {
++ LocalFileSystemUtils.traverse(files, path.toFile(), expression, dirs);
++ it = files.iterator();
++ if (isFeed) {
++ keys.clear();
++ if (watcher != null) {
++ try {
++ watcher.close();
++ } catch (IOException e) {
++ LOGGER.warn("Failed to close watcher service", e);
++ }
++ }
++ watcher = FileSystems.getDefault().newWatchService();
++ for (Path dirPath : dirs) {
++ register(dirPath);
++ }
++ resume();
++ } else {
++ if (files.isEmpty()) {
++ throw new HyracksDataException(path + ": no files found");
+ }
+ }
- watcher = FileSystems.getDefault().newWatchService();
- for (Path path : dirs) {
- register(path);
- }
- resume();
+ }
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ /**
+ * Register the given directory, and all its sub-directories, with the
+ * WatchService.
+ */
+ private void register(Path dir) throws IOException {
+ WatchKey key = dir.register(watcher, StandardWatchEventKinds.ENTRY_CREATE, StandardWatchEventKinds.ENTRY_DELETE,
+ StandardWatchEventKinds.ENTRY_MODIFY);
+ keys.put(key, dir);
+ }
+
- private void resume() throws IOException {
++ private synchronized void resume() throws IOException {
+ if (logManager == null) {
+ return;
+ }
+ /*
+ * Done processing the progress log file. We now have:
+ * the files that were completed.
+ */
+
+ if (it == null) {
+ return;
+ }
+ while (it.hasNext()) {
+ File file = it.next();
+ if (logManager.isSplitRead(file.getAbsolutePath())) {
+ // File was read completely, remove it from the files list
+ it.remove();
+ }
+ }
+ // reset the iterator
+ it = files.iterator();
+ }
+
+ @SuppressWarnings("unchecked")
+ static <T> WatchEvent<T> cast(WatchEvent<?> event) {
+ return (WatchEvent<T>) event;
+ }
+
+ private void handleEvents(WatchKey key) throws IOException {
+ // get dir associated with the key
+ Path dir = keys.get(key);
+ if (dir == null) {
+ // This should never happen
+ if (LOGGER.isEnabledFor(Level.WARN)) {
+ LOGGER.warn("WatchKey not recognized!!");
+ }
+ return;
+ }
+ for (WatchEvent<?> event : key.pollEvents()) {
+ Kind<?> kind = event.kind();
- // TODO: Do something about overflow events
+ // An overflow event means that some events were dropped
+ if (kind == StandardWatchEventKinds.OVERFLOW) {
+ if (LOGGER.isEnabledFor(Level.WARN)) {
+ LOGGER.warn("Overflow event. Some events might have been missed");
+ }
+ // need to read and validate all files.
- //TODO: use btrees for all logs
+ init();
+ return;
+ }
+
+ // Context for directory entry event is the file name of entry
+ WatchEvent<Path> ev = cast(event);
+ Path name = ev.context();
+ Path child = dir.resolve(name);
+ // if directory is created then register it and its sub-directories
+ if ((kind == StandardWatchEventKinds.ENTRY_CREATE)) {
+ try {
+ if (Files.isDirectory(child, LinkOption.NOFOLLOW_LINKS)) {
+ register(child);
+ } else {
+ // it is a file, add it to the files list.
+ LocalFileSystemUtils.validateAndAdd(child, expression, files);
+ }
+ } catch (IOException e) {
+ if (LOGGER.isEnabledFor(Level.ERROR)) {
+ LOGGER.error(e);
+ }
+ }
+ }
+ }
++ it = files.iterator();
+ }
+
- public void close() throws IOException {
++ public synchronized void close() throws IOException {
+ if (!done) {
+ if (watcher != null) {
+ watcher.close();
+ watcher = null;
+ }
- if (logManager != null) {
- if (current != null) {
- logManager.startPartition(current.getAbsolutePath());
- logManager.endPartition();
- }
- logManager.close();
- current = null;
- }
+ done = true;
+ }
+ }
+
- public File next() throws IOException {
- if ((current != null) && (logManager != null)) {
- logManager.startPartition(current.getAbsolutePath());
- logManager.endPartition();
- }
- current = it.next();
- return current;
- }
-
- private boolean endOfEvents(WatchKey key) {
- // reset key and remove from set if directory no longer accessible
- if (!key.reset()) {
- keys.remove(key);
- if (keys.isEmpty()) {
- return true;
- }
- }
- return false;
- }
-
- public boolean hasNext() throws IOException {
++ // poll is not blocking
++ public synchronized File poll() throws IOException {
+ if (it.hasNext()) {
- return true;
++ return it.next();
+ }
+ if (done || !isFeed) {
- return false;
++ return null;
+ }
+ files.clear();
++ it = files.iterator();
+ if (keys.isEmpty()) {
- return false;
++ close();
++ return null;
+ }
+ // Read new Events (Polling first to add all available files)
+ WatchKey key;
+ key = watcher.poll();
+ while (key != null) {
+ handleEvents(key);
+ if (endOfEvents(key)) {
+ close();
- return false;
++ return null;
+ }
+ key = watcher.poll();
+ }
- // No file was found, wait for the filesystem to push events
- if (controller != null) {
- controller.flush();
++ return null;
++ }
++
++ // take is blocking
++ public synchronized File take() throws IOException {
++ File next = poll();
++ if (next != null) {
++ return next;
+ }
- while (files.isEmpty()) {
- try {
- key = watcher.take();
- } catch (InterruptedException x) {
- if (LOGGER.isEnabledFor(Level.WARN)) {
- LOGGER.warn("Feed Closed");
- }
- if (watcher == null) {
- return false;
- }
- continue;
- } catch (ClosedWatchServiceException e) {
- if (LOGGER.isEnabledFor(Level.WARN)) {
- LOGGER.warn("The watcher has exited");
++ if (done || !isFeed) {
++ return null;
++ }
++ // No file was found, wait for the filesystem to push events
++ WatchKey key = null;
++ lock.lock();
++ try {
++ while (!it.hasNext()) {
++ try {
++ key = watcher.take();
++ } catch (InterruptedException x) {
++ if (LOGGER.isEnabledFor(Level.WARN)) {
++ LOGGER.warn("Feed Closed");
++ }
++ if (watcher == null) {
++ return null;
++ }
++ continue;
++ } catch (ClosedWatchServiceException e) {
++ if (LOGGER.isEnabledFor(Level.WARN)) {
++ LOGGER.warn("The watcher has exited");
++ }
++ if (watcher == null) {
++ return null;
++ }
++ continue;
+ }
- if (watcher == null) {
- return false;
++ handleEvents(key);
++ if (endOfEvents(key)) {
++ return null;
+ }
- continue;
- }
- handleEvents(key);
- if (endOfEvents(key)) {
- return false;
+ }
++ } finally {
++ lock.unlock();
+ }
+ // files were found, re-create the iterator and move it one step
- it = files.iterator();
- return it.hasNext();
++ return it.next();
+ }
+
- public void setController(AbstractFeedDataFlowController controller) {
- this.controller = controller;
++ private boolean endOfEvents(WatchKey key) {
++ // reset key and remove from set if directory no longer accessible
++ if (!key.reset()) {
++ keys.remove(key);
++ if (keys.isEmpty()) {
++ return true;
++ }
++ }
++ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8516517e/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/LocalFileSystemUtils.java
----------------------------------------------------------------------
diff --cc asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/LocalFileSystemUtils.java
index d6e9463,0000000..16dd1e9
mode 100644,000000..100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/LocalFileSystemUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/LocalFileSystemUtils.java
@@@ -1,75 -1,0 +1,76 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.util;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.FileVisitResult;
+import java.nio.file.Files;
+import java.nio.file.LinkOption;
+import java.nio.file.Path;
+import java.nio.file.SimpleFileVisitor;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.util.LinkedList;
+import java.util.regex.Pattern;
+
++import org.apache.hyracks.api.exceptions.HyracksDataException;
++
+public class LocalFileSystemUtils {
+
- //TODO: replace this method by FileUtils.iterateFilesAndDirs(.)
+ public static void traverse(final LinkedList<File> files, File root, final String expression,
+ final LinkedList<Path> dirs) throws IOException {
- if (!Files.exists(root.toPath())) {
- return;
++ final Path path = root.toPath();
++ if (!Files.exists(path)) {
++ throw new HyracksDataException(path + ": path not found");
+ }
- if (!Files.isDirectory(root.toPath())) {
- validateAndAdd(root.toPath(), expression, files);
++ if (!Files.isDirectory(path)) {
++ validateAndAdd(path, expression, files);
+ }
- //FileUtils.iterateFilesAndDirs(directory, fileFilter, dirFilter)
- Files.walkFileTree(root.toPath(), new SimpleFileVisitor<Path>() {
++ Files.walkFileTree(path, new SimpleFileVisitor<Path>() {
+ @Override
+ public FileVisitResult preVisitDirectory(Path path, BasicFileAttributes attrs) throws IOException {
+ if (!Files.exists(path, LinkOption.NOFOLLOW_LINKS)) {
+ return FileVisitResult.TERMINATE;
+ }
+ if (Files.isDirectory(path, LinkOption.NOFOLLOW_LINKS)) {
+ if (dirs != null) {
+ dirs.add(path);
+ }
+ //get immediate children files
+ File[] content = path.toFile().listFiles();
+ for (File file : content) {
+ if (!file.isDirectory()) {
+ validateAndAdd(file.toPath(), expression, files);
+ }
+ }
+ } else {
+ // Path is a file, add to list of files if it matches the expression
+ validateAndAdd(path, expression, files);
+ }
+ return FileVisitResult.CONTINUE;
+ }
+ });
+ }
+
+ public static void validateAndAdd(Path path, String expression, LinkedList<File> files) {
+ if (expression == null || Pattern.matches(expression, path.toString())) {
+ files.add(new File(path.toString()));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8516517e/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/classad/test/ClassAdToADMTest.java
----------------------------------------------------------------------
diff --cc asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/classad/test/ClassAdToADMTest.java
index d822310,0000000..493bd3b
mode 100644,000000..100644
--- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/classad/test/ClassAdToADMTest.java
+++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/classad/test/ClassAdToADMTest.java
@@@ -1,110 -1,0 +1,189 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.classad.test;
+
++import java.io.File;
++import java.io.PrintStream;
++import java.nio.file.Files;
++import java.nio.file.Path;
+import java.nio.file.Paths;
++import java.util.ArrayList;
++import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.asterix.external.api.IRawRecord;
+import org.apache.asterix.external.classad.CaseInsensitiveString;
+import org.apache.asterix.external.classad.CharArrayLexerSource;
+import org.apache.asterix.external.classad.ClassAd;
+import org.apache.asterix.external.classad.ExprTree;
+import org.apache.asterix.external.classad.Value;
+import org.apache.asterix.external.classad.object.pool.ClassAdObjectPool;
+import org.apache.asterix.external.input.record.reader.stream.SemiStructuredRecordReader;
+import org.apache.asterix.external.input.stream.LocalFSInputStream;
+import org.apache.asterix.external.library.ClassAdParser;
- import org.apache.hyracks.api.io.FileReference;
- import org.apache.hyracks.dataflow.std.file.FileSplit;
++import org.apache.asterix.external.util.FileSystemWatcher;
++import org.apache.asterix.formats.nontagged.AqlADMPrinterFactoryProvider;
++import org.apache.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
++import org.apache.asterix.om.types.ARecordType;
++import org.apache.asterix.om.types.BuiltinType;
++import org.apache.asterix.om.types.IAType;
++import org.apache.commons.io.FileUtils;
++import org.apache.hyracks.algebricks.data.IPrinter;
++import org.apache.hyracks.algebricks.data.IPrinterFactory;
++import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
++import org.apache.hyracks.api.exceptions.HyracksDataException;
++import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
++import org.junit.Assert;
+
+import junit.framework.Test;
+import junit.framework.TestCase;
+import junit.framework.TestSuite;
+
+public class ClassAdToADMTest extends TestCase {
+ /**
+ * Create the test case
+ *
+ * @param testName
+ * name of the test case
+ */
+ public ClassAdToADMTest(String testName) {
+ super(testName);
+ }
+
+ /**
+ * @return the suite of tests being tested
+ */
+ public static Test suite() {
+ return new TestSuite(ClassAdToADMTest.class);
+ }
+
++ private void printTuple(ArrayTupleBuilder tb, IPrinter[] printers, PrintStream printStream)
++ throws HyracksDataException {
++ int[] offsets = tb.getFieldEndOffsets();
++ for (int i = 0; i < printers.length; i++) {
++ int offset = i == 0 ? 0 : offsets[i - 1];
++ int length = i == 0 ? offsets[0] : offsets[i] - offsets[i - 1];
++ printers[i].print(tb.getByteArray(), offset, length, printStream);
++ printStream.println();
++ }
++ }
++
++ @SuppressWarnings("rawtypes")
++ public void testSchemaful() {
++ try {
++ File file = new File("target/classad-wtih-temporals.adm");
++ File expected = new File(getClass().getResource("/results/classad-with-temporals.adm").toURI().getPath());
++ FileUtils.deleteQuietly(file);
++ PrintStream printStream = new PrintStream(Files.newOutputStream(Paths.get(file.toURI())));
++ String[] recordFieldNames = { "GlobalJobId", "Owner", "ClusterId", "ProcId", "RemoteWallClockTime",
++ "CompletionDate", "QDate", "JobCurrentStartDate", "JobStartDate", "JobCurrentStartExecutingDate" };
++ IAType[] recordFieldTypes = { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.AINT32,
++ BuiltinType.AINT32, BuiltinType.ADURATION, BuiltinType.ADATETIME, BuiltinType.ADATETIME,
++ BuiltinType.ADATETIME, BuiltinType.ADATETIME, BuiltinType.ADATETIME };
++ ARecordType recordType = new ARecordType("value", recordFieldNames, recordFieldTypes, true);
++ int numOfTupleFields = 1;
++ ISerializerDeserializer[] serdes = new ISerializerDeserializer[1];
++ serdes[0] = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(recordType);
++ IPrinterFactory[] printerFactories = new IPrinterFactory[1];
++ printerFactories[0] = AqlADMPrinterFactoryProvider.INSTANCE.getPrinterFactory(recordType);
++ // create output descriptor
++ IPrinter[] printers = new IPrinter[printerFactories.length];
++ for (int i = 0; i < printerFactories.length; i++) {
++ printers[i] = printerFactories[i].createPrinter();
++ }
++ ClassAdObjectPool objectPool = new ClassAdObjectPool();
++ String[] files = new String[] { "/classad-with-temporals.classads" };
++ ClassAdParser parser = new ClassAdParser(recordType, false, false, false, null, null, null, objectPool);
++ ArrayTupleBuilder tb = new ArrayTupleBuilder(numOfTupleFields);
++ for (String path : files) {
++ List<Path> paths = new ArrayList<>();
++ paths.add(Paths.get(getClass().getResource(path).toURI()));
++ FileSystemWatcher watcher = new FileSystemWatcher(paths, null, false);
++ LocalFSInputStream in = new LocalFSInputStream(watcher);
++ SemiStructuredRecordReader recordReader = new SemiStructuredRecordReader(in, "[", "]");
++ while (recordReader.hasNext()) {
++ tb.reset();
++ IRawRecord<char[]> record = recordReader.next();
++ parser.parse(record, tb.getDataOutput());
++ tb.addFieldEndOffset();
++ printTuple(tb, printers, printStream);
++ }
++ recordReader.close();
++ printStream.close();
++ Assert.assertTrue(FileUtils.contentEquals(file, expected));
++ }
++ } catch (Throwable th) {
++ System.err.println("TEST FAILED");
++ th.printStackTrace();
++ Assert.assertTrue(false);
++ }
++ System.err.println("TEST PASSED");
++ }
++
+ /**
+ *
+ */
- public void test() {
++ public void testSchemaless() {
+ try {
- // test here
+ ClassAdObjectPool objectPool = new ClassAdObjectPool();
+ ClassAd pAd = new ClassAd(objectPool);
+ String[] files = new String[] { "/jobads.txt" };
+ ClassAdParser parser = new ClassAdParser(objectPool);
+ CharArrayLexerSource lexerSource = new CharArrayLexerSource();
+ for (String path : files) {
- LocalFSInputStream in = new LocalFSInputStream(
- new FileSplit[] { new FileSplit("",
- new FileReference(Paths.get(getClass().getResource(path).toURI()).toFile())) },
- null, null, 0, null, false);
- SemiStructuredRecordReader recordReader = new SemiStructuredRecordReader(in, null, "[", "]");
++ List<Path> paths = new ArrayList<>();
++ paths.add(Paths.get(getClass().getResource(path).toURI()));
++ FileSystemWatcher watcher = new FileSystemWatcher(paths, null, false);
++ LocalFSInputStream in = new LocalFSInputStream(watcher);
++ SemiStructuredRecordReader recordReader = new SemiStructuredRecordReader(in, "[", "]");
+ Value val = new Value(objectPool);
+ while (recordReader.hasNext()) {
+ val.reset();
+ IRawRecord<char[]> record = recordReader.next();
+ lexerSource.setNewSource(record.get());
+ parser.setLexerSource(lexerSource);
+ parser.parseNext(pAd);
+ Map<CaseInsensitiveString, ExprTree> attrs = pAd.getAttrList();
+ for (Entry<CaseInsensitiveString, ExprTree> entry : attrs.entrySet()) {
+ ExprTree tree = entry.getValue();
+ switch (tree.getKind()) {
+ case ATTRREF_NODE:
+ case CLASSAD_NODE:
+ case EXPR_ENVELOPE:
+ case EXPR_LIST_NODE:
+ case FN_CALL_NODE:
+ case OP_NODE:
+ break;
+ case LITERAL_NODE:
+ break;
+ default:
+ System.out.println("Something is wrong");
+ break;
+ }
+ }
+ }
+ recordReader.close();
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ assertTrue(false);
+ }
+ }
+}