You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by am...@apache.org on 2016/01/03 18:41:01 UTC

[03/21] incubator-asterixdb git commit: First stage of external data cleanup

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/CounterTimerTupleForwardPolicy.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/CounterTimerTupleForwardPolicy.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/CounterTimerTupleForwardPolicy.java
deleted file mode 100644
index c06caef..0000000
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/CounterTimerTupleForwardPolicy.java
+++ /dev/null
@@ -1,158 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.runtime.operators.file;
-
-import java.util.Map;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.common.parse.ITupleForwardPolicy;
-import org.apache.hyracks.api.comm.IFrame;
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.comm.VSizeFrame;
-import org.apache.hyracks.api.context.IHyracksCommonContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
-
-public class CounterTimerTupleForwardPolicy implements ITupleForwardPolicy {
-
-    public static final String BATCH_SIZE = "batch-size";
-    public static final String BATCH_INTERVAL = "batch-interval";
-
-    private static final Logger LOGGER = Logger.getLogger(CounterTimerTupleForwardPolicy.class.getName());
-   
-    private FrameTupleAppender appender;
-    private IFrame frame;
-    private IFrameWriter writer;
-    private int batchSize;
-    private long batchInterval;
-    private int tuplesInFrame = 0;
-    private TimeBasedFlushTask flushTask;
-    private Timer timer;
-    private Object lock = new Object();
-    private boolean activeTimer = false;
-
-    public void configure(Map<String, String> configuration) {
-        String propValue = (String) configuration.get(BATCH_SIZE);
-        if (propValue != null) {
-            batchSize = Integer.parseInt(propValue);
-        } else {
-            batchSize = -1;
-        }
-
-        propValue = (String) configuration.get(BATCH_INTERVAL);
-        if (propValue != null) {
-            batchInterval = Long.parseLong(propValue);
-            activeTimer = true;
-        }
-    }
-
-    public void initialize(IHyracksCommonContext ctx, IFrameWriter writer) throws HyracksDataException {
-        this.appender = new FrameTupleAppender();
-        this.frame = new VSizeFrame(ctx);
-        appender.reset(frame, true);
-        this.writer = writer;
-        if (activeTimer) {
-            this.timer = new Timer();
-            this.flushTask = new TimeBasedFlushTask(writer, lock);
-            timer.scheduleAtFixedRate(flushTask, 0, batchInterval);
-        }
-    }
-
-    public void addTuple(ArrayTupleBuilder tb) throws HyracksDataException {
-        if (activeTimer) {
-            synchronized (lock) {
-                addTupleToFrame(tb);
-            }
-        } else {
-            addTupleToFrame(tb);
-        }
-        tuplesInFrame++;
-    }
-
-    private void addTupleToFrame(ArrayTupleBuilder tb) throws HyracksDataException {
-        if (tuplesInFrame == batchSize || !appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info("flushing frame containg (" + tuplesInFrame + ") tuples");
-            }
-            FrameUtils.flushFrame(frame.getBuffer(), writer);
-            tuplesInFrame = 0;
-            appender.reset(frame, true);
-            if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
-                throw new IllegalStateException();
-            }
-        }
-    }
-
-    public void close() throws HyracksDataException {
-        if (appender.getTupleCount() > 0) {
-            if (activeTimer) {
-                synchronized (lock) {
-                    FrameUtils.flushFrame(frame.getBuffer(), writer);
-                }
-            } else {
-                FrameUtils.flushFrame(frame.getBuffer(), writer);
-            }
-        }
-
-        if (timer != null) {
-            timer.cancel();
-        }
-    }
-
-    private class TimeBasedFlushTask extends TimerTask {
-
-        private IFrameWriter writer;
-        private final Object lock;
-
-        public TimeBasedFlushTask(IFrameWriter writer, Object lock) {
-            this.writer = writer;
-            this.lock = lock;
-        }
-
-        @Override
-        public void run() {
-            try {
-                if (tuplesInFrame > 0) {
-                    if (LOGGER.isLoggable(Level.INFO)) {
-                        LOGGER.info("TTL expired flushing frame (" + tuplesInFrame + ")");
-                    }
-                    synchronized (lock) {
-                        FrameUtils.flushFrame(frame.getBuffer(), writer);
-                        appender.reset(frame, true);
-                        tuplesInFrame = 0;
-                    }
-                }
-            } catch (HyracksDataException e) {
-                e.printStackTrace();
-            }
-        }
-
-    }
-
-    @Override
-    public TupleForwardPolicyType getType() {
-        return TupleForwardPolicyType.COUNTER_TIMER_EXPIRED;
-    }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/DelimitedDataParser.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/DelimitedDataParser.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/DelimitedDataParser.java
deleted file mode 100644
index ca7c5c4..0000000
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/DelimitedDataParser.java
+++ /dev/null
@@ -1,176 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.runtime.operators.file;
-
-import java.io.DataOutput;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-
-import org.apache.asterix.builders.IARecordBuilder;
-import org.apache.asterix.builders.RecordBuilder;
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.dataflow.data.nontagged.serde.ANullSerializerDeserializer;
-import org.apache.asterix.om.base.AMutableString;
-import org.apache.asterix.om.base.ANull;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.asterix.om.types.ATypeTag;
-import org.apache.asterix.om.util.NonTaggedFormatUtil;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
-import org.apache.hyracks.dataflow.common.data.parsers.IValueParser;
-import org.apache.hyracks.dataflow.common.data.parsers.IValueParserFactory;
-import org.apache.hyracks.dataflow.std.file.FieldCursorForDelimitedDataParser;
-
-public class DelimitedDataParser extends AbstractDataParser implements IDataParser {
-
-    protected final IValueParserFactory[] valueParserFactories;
-    protected final char fieldDelimiter;
-    protected final char quote;
-    protected final boolean hasHeader;
-    protected final ARecordType recordType;
-    private IARecordBuilder recBuilder;
-    private ArrayBackedValueStorage fieldValueBuffer;
-    private DataOutput fieldValueBufferOutput;
-    private IValueParser[] valueParsers;
-    private FieldCursorForDelimitedDataParser cursor;
-    private byte[] fieldTypeTags;
-    private int[] fldIds;
-    private ArrayBackedValueStorage[] nameBuffers;
-    private boolean areAllNullFields;
-
-    public DelimitedDataParser(ARecordType recordType, IValueParserFactory[] valueParserFactories, char fieldDelimter,
-            char quote, boolean hasHeader) {
-        this.recordType = recordType;
-        this.valueParserFactories = valueParserFactories;
-        this.fieldDelimiter = fieldDelimter;
-        this.quote = quote;
-        this.hasHeader = hasHeader;
-    }
-
-    @Override
-    public void initialize(InputStream in, ARecordType recordType, boolean datasetRec) throws AsterixException,
-            IOException {
-
-        valueParsers = new IValueParser[valueParserFactories.length];
-        for (int i = 0; i < valueParserFactories.length; ++i) {
-            valueParsers[i] = valueParserFactories[i].createValueParser();
-        }
-
-        fieldValueBuffer = new ArrayBackedValueStorage();
-        fieldValueBufferOutput = fieldValueBuffer.getDataOutput();
-        recBuilder = new RecordBuilder();
-        recBuilder.reset(recordType);
-        recBuilder.init();
-
-        int n = recordType.getFieldNames().length;
-        fieldTypeTags = new byte[n];
-        for (int i = 0; i < n; i++) {
-            ATypeTag tag = recordType.getFieldTypes()[i].getTypeTag();
-            fieldTypeTags[i] = tag.serialize();
-        }
-
-        fldIds = new int[n];
-        nameBuffers = new ArrayBackedValueStorage[n];
-        AMutableString str = new AMutableString(null);
-        for (int i = 0; i < n; i++) {
-            String name = recordType.getFieldNames()[i];
-            fldIds[i] = recBuilder.getFieldId(name);
-            if (fldIds[i] < 0) {
-                if (!recordType.isOpen()) {
-                    throw new HyracksDataException("Illegal field " + name + " in closed type " + recordType);
-                } else {
-                    nameBuffers[i] = new ArrayBackedValueStorage();
-                    fieldNameToBytes(name, str, nameBuffers[i]);
-                }
-            }
-        }
-
-        cursor = new FieldCursorForDelimitedDataParser(new InputStreamReader(in), fieldDelimiter, quote);
-    }
-
-    @Override
-    public boolean parse(DataOutput out) throws AsterixException, IOException {
-        if (hasHeader && cursor.recordCount == 0) {
-            // Consume all fields of first record
-            cursor.nextRecord();
-            while (cursor.nextField());
-        }
-        while (cursor.nextRecord()) {
-            recBuilder.reset(recordType);
-            recBuilder.init();
-            areAllNullFields = true;
-
-            for (int i = 0; i < valueParsers.length; ++i) {
-                if (!cursor.nextField()) {
-                    break;
-                }
-                fieldValueBuffer.reset();
-
-                if (cursor.fStart == cursor.fEnd && recordType.getFieldTypes()[i].getTypeTag() != ATypeTag.STRING
-                        && recordType.getFieldTypes()[i].getTypeTag() != ATypeTag.NULL) {
-                    // if the field is empty and the type is optional, insert
-                    // NULL. Note that string type can also process empty field as an
-                    // empty string
-                    if (!NonTaggedFormatUtil.isOptional(recordType.getFieldTypes()[i])) {
-                        throw new AsterixException("At record: " + cursor.recordCount + " - Field " + cursor.fieldCount
-                                + " is not an optional type so it cannot accept null value. ");
-                    }
-                    fieldValueBufferOutput.writeByte(ATypeTag.NULL.serialize());
-                    ANullSerializerDeserializer.INSTANCE.serialize(ANull.NULL, out);
-                } else {
-                    fieldValueBufferOutput.writeByte(fieldTypeTags[i]);
-                    // Eliminate doule quotes in the field that we are going to parse
-                    if (cursor.isDoubleQuoteIncludedInThisField) {
-                        cursor.eliminateDoubleQuote(cursor.buffer, cursor.fStart, cursor.fEnd - cursor.fStart);
-                        cursor.fEnd -= cursor.doubleQuoteCount;
-                        cursor.isDoubleQuoteIncludedInThisField = false;
-                    }
-                    valueParsers[i].parse(cursor.buffer, cursor.fStart, cursor.fEnd - cursor.fStart,
-                            fieldValueBufferOutput);
-                    areAllNullFields = false;
-                }
-                if (fldIds[i] < 0) {
-                    recBuilder.addField(nameBuffers[i], fieldValueBuffer);
-                } else {
-                    recBuilder.addField(fldIds[i], fieldValueBuffer);
-                }
-            }
-
-            if (!areAllNullFields) {
-                recBuilder.write(out, true);
-                return true;
-            }
-        }
-        return false;
-    }
-
-    protected void fieldNameToBytes(String fieldName, AMutableString str, ArrayBackedValueStorage buffer)
-            throws HyracksDataException {
-        buffer.reset();
-        DataOutput out = buffer.getDataOutput();
-        str.setValue(fieldName);
-        try {
-            stringSerde.serialize(str, out);
-        } catch (IOException e) {
-            throw new HyracksDataException(e);
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/FrameFullTupleForwardPolicy.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/FrameFullTupleForwardPolicy.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/FrameFullTupleForwardPolicy.java
deleted file mode 100644
index e22180c..0000000
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/FrameFullTupleForwardPolicy.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.runtime.operators.file;
-
-import java.util.Map;
-
-import org.apache.asterix.common.parse.ITupleForwardPolicy;
-import org.apache.hyracks.api.comm.IFrame;
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.comm.VSizeFrame;
-import org.apache.hyracks.api.context.IHyracksCommonContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
-
-public class FrameFullTupleForwardPolicy implements ITupleForwardPolicy {
-
-	private FrameTupleAppender appender;
-	private IFrame frame;
-	private IFrameWriter writer;
-
-	public void configure(Map<String, String> configuration) {
-		// no-op
-	}
-
-	public void initialize(IHyracksCommonContext ctx, IFrameWriter writer)
-			throws HyracksDataException {
-		this.appender = new FrameTupleAppender();
-		this.frame = new VSizeFrame(ctx);
-		this.writer = writer;
-		appender.reset(frame, true);
-	}
-
-	public void addTuple(ArrayTupleBuilder tb) throws HyracksDataException {
-		boolean success = appender.append(tb.getFieldEndOffsets(),
-				tb.getByteArray(), 0, tb.getSize());
-		if (!success) {
-			FrameUtils.flushFrame(frame.getBuffer(), writer);
-			appender.reset(frame, true);
-			success = appender.append(tb.getFieldEndOffsets(),
-					tb.getByteArray(), 0, tb.getSize());
-			if (!success) {
-				throw new IllegalStateException();
-			}
-		}
-	}
-
-	public void close() throws HyracksDataException {
-		if (appender.getTupleCount() > 0) {
-			FrameUtils.flushFrame(frame.getBuffer(), writer);
-		}
-
-	}
-
-	@Override
-	public TupleForwardPolicyType getType() {
-		return TupleForwardPolicyType.FRAME_FULL;
-	}
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/IDataParser.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/IDataParser.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/IDataParser.java
deleted file mode 100644
index ba90e6c..0000000
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/IDataParser.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.runtime.operators.file;
-
-import java.io.DataOutput;
-import java.io.IOException;
-import java.io.InputStream;
-
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.om.types.ARecordType;
-
-/**
- * Interface implemented by a parser
- */
-public interface IDataParser {
-
-    /**
-     * Initialize the parser prior to actual parsing.
-     * 
-     * @param in
-     *            input stream to be parsed
-     * @param recordType
-     *            record type associated with input data
-     * @param datasetRec
-     *            boolean flag set to true if input data represents dataset
-     *            records.
-     * @throws AsterixException
-     * @throws IOException
-     */
-    public void initialize(InputStream in, ARecordType recordType, boolean datasetRec) throws AsterixException,
-            IOException;
-
-    /**
-     * Parse data from source input stream and output ADM records.
-     * 
-     * @param out
-     *            DataOutput instance that for writing the parser output.
-     * @return
-     * @throws AsterixException
-     * @throws IOException
-     */
-    public boolean parse(DataOutput out) throws AsterixException, IOException;
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/RateContolledParserPolicy.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/RateContolledParserPolicy.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/RateContolledParserPolicy.java
deleted file mode 100644
index 7b5d331..0000000
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/RateContolledParserPolicy.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.runtime.operators.file;
-
-import java.util.Map;
-
-import org.apache.asterix.common.parse.ITupleParserPolicy;
-import org.apache.hyracks.api.comm.IFrame;
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.comm.VSizeFrame;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
-
-public class RateContolledParserPolicy implements ITupleParserPolicy {
-
-    protected FrameTupleAppender appender;
-    protected IFrame  frame;
-    private IFrameWriter writer;
-    private long interTupleInterval;
-    private boolean delayConfigured;
-
-    public static final String INTER_TUPLE_INTERVAL = "tuple-interval";
-
-    public RateContolledParserPolicy() {
-
-    }
-
-    public TupleParserPolicy getType() {
-        return ITupleParserPolicy.TupleParserPolicy.FRAME_FULL;
-    }
-
- 
-    @Override
-    public void addTuple(ArrayTupleBuilder tb) throws HyracksDataException {
-        if (delayConfigured) {
-            try {
-                Thread.sleep(interTupleInterval);
-            } catch (InterruptedException e) {
-                throw new HyracksDataException(e);
-            }
-        }
-        boolean success = appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize());
-        if (!success) {
-            FrameUtils.flushFrame(frame.getBuffer(), writer);
-            appender.reset(frame, true);
-            success = appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize());
-            if (!success) {
-                throw new IllegalStateException();
-            }
-        }
-        appender.reset(frame, true);
-    }
-
-    @Override
-    public void close() throws HyracksDataException {
-        if (appender.getTupleCount() > 0) {
-            FrameUtils.flushFrame(frame.getBuffer(), writer);
-        }
-    }
-
-    @Override
-    public void configure(Map<String, String> configuration) throws HyracksDataException {
-        String propValue = configuration.get(INTER_TUPLE_INTERVAL);
-        if (propValue != null) {
-            interTupleInterval = Long.parseLong(propValue);
-        } else {
-            interTupleInterval = 0;
-        }
-        delayConfigured = interTupleInterval != 0;
-        
-    }
-
-    @Override
-    public void initialize(IHyracksTaskContext ctx, IFrameWriter writer) throws HyracksDataException {
-        this.appender = new FrameTupleAppender();
-        this.frame = new VSizeFrame(ctx);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/RateControlledTupleForwardPolicy.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/RateControlledTupleForwardPolicy.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/RateControlledTupleForwardPolicy.java
deleted file mode 100644
index c5af720..0000000
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/RateControlledTupleForwardPolicy.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.runtime.operators.file;
-
-import java.util.Map;
-
-import org.apache.asterix.common.parse.ITupleForwardPolicy;
-import org.apache.hyracks.api.comm.IFrame;
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.comm.VSizeFrame;
-import org.apache.hyracks.api.context.IHyracksCommonContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
-
-public class RateControlledTupleForwardPolicy implements ITupleForwardPolicy {
-
-    private FrameTupleAppender appender;
-    private IFrame frame;
-    private IFrameWriter writer;
-    private long interTupleInterval;
-    private boolean delayConfigured;
-
-    public static final String INTER_TUPLE_INTERVAL = "tuple-interval";
-
-    public void configure(Map<String, String> configuration) {
-        String propValue = configuration.get(INTER_TUPLE_INTERVAL);
-        if (propValue != null) {
-            interTupleInterval = Long.parseLong(propValue);
-        }
-        delayConfigured = interTupleInterval != 0;
-    }
-
-    public void initialize(IHyracksCommonContext ctx, IFrameWriter writer) throws HyracksDataException {
-        this.appender = new FrameTupleAppender();
-        this.frame = new VSizeFrame(ctx);
-        this.writer = writer;
-        appender.reset(frame, true);
-    }
-
-    public void addTuple(ArrayTupleBuilder tb) throws HyracksDataException {
-        if (delayConfigured) {
-            try {
-                Thread.sleep(interTupleInterval);
-            } catch (InterruptedException e) {
-                e.printStackTrace();
-            }
-        }
-        boolean success = appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize());
-        if (!success) {
-            FrameUtils.flushFrame(frame.getBuffer(), writer);
-            appender.reset(frame, true);
-            success = appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize());
-            if (!success) {
-                throw new IllegalStateException();
-            }
-        }
-    }
-
-    public void close() throws HyracksDataException {
-        if (appender.getTupleCount() > 0) {
-            FrameUtils.flushFrame(frame.getBuffer(), writer);
-        }
-
-    }
-
-    @Override
-    public TupleForwardPolicyType getType() {
-        return TupleForwardPolicyType.RATE_CONTROLLED;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-runtime/src/main/resources/adm.grammar
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/resources/adm.grammar b/asterix-runtime/src/main/resources/adm.grammar
deleted file mode 100644
index 1910436..0000000
--- a/asterix-runtime/src/main/resources/adm.grammar
+++ /dev/null
@@ -1,86 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-# LEXER GENERATOR configuration file
-# ---------------------------------------
-# Place *first* the generic configuration
-# then list your grammar.
-
-PACKAGE:          org.apache.asterix.runtime.operators.file.adm
-LEXER_NAME:       AdmLexer
-
-TOKENS:
-
-BOOLEAN_CONS   = string(boolean)
-INT8_CONS      = string(int8)
-INT16_CONS     = string(int16)
-INT32_CONS     = string(int32)
-INT64_CONS     = string(int64)
-INT64_CONS     = string(int)
-FLOAT_CONS     = string(float)
-DOUBLE_CONS    = string(double)
-DATE_CONS      = string(date)
-DATETIME_CONS  = string(datetime)
-DURATION_CONS  = string(duration)
-STRING_CONS    = string(string)
-HEX_CONS       = string(hex)
-BASE64_CONS    = string(base64)
-POINT_CONS     = string(point)
-POINT3D_CONS   = string(point3d)
-LINE_CONS      = string(line)
-POLYGON_CONS   = string(polygon)
-RECTANGLE_CONS = string(rectangle)
-CIRCLE_CONS    = string(circle)
-TIME_CONS      = string(time)
-INTERVAL_TIME_CONS      = string(interval-time)
-INTERVAL_DATE_CONS      = string(interval-date)
-INTERVAL_DATETIME_CONS  = string(interval-datetime)
-YEAR_MONTH_DURATION_CONS = string(year-month-duration)
-DAY_TIME_DURATION_CONS   = string(day-time-duration)
-UUID_CONS      = string(uuid)
-
-NULL_LITERAL   = string(null)
-TRUE_LITERAL   = string(true)
-FALSE_LITERAL  = string(false)
-
-CONSTRUCTOR_OPEN     = char(()
-CONSTRUCTOR_CLOSE    = char())
-START_RECORD         = char({)
-END_RECORD           = char(})
-COMMA                = char(\,)
-COLON                = char(:)
-START_ORDERED_LIST   = char([)
-END_ORDERED_LIST     = char(])
-START_UNORDERED_LIST = string({{)
-# END_UNORDERED_LIST  = }} is recognized as a double END_RECORD token
-
-STRING_LITERAL       = char("), anythingUntil(")
-
-INT_LITERAL          = signOrNothing(), digitSequence()
-INT8_LITERAL         = token(INT_LITERAL), string(i8)
-INT16_LITERAL        = token(INT_LITERAL), string(i16)
-INT32_LITERAL        = token(INT_LITERAL), string(i32)
-INT64_LITERAL        = token(INT_LITERAL), string(i64)
-
-@EXPONENT            = caseInsensitiveChar(e), signOrNothing(), digitSequence()
-
-DOUBLE_LITERAL		 = signOrNothing(), char(.), digitSequence()
-DOUBLE_LITERAL		 = signOrNothing(), digitSequence(), char(.), digitSequence()
-DOUBLE_LITERAL		 = signOrNothing(), digitSequence(), char(.), digitSequence(), token(@EXPONENT)
-DOUBLE_LITERAL		 = signOrNothing(), digitSequence(), token(@EXPONENT)
-
-FLOAT_LITERAL		 = token(DOUBLE_LITERAL), caseInsensitiveChar(f)

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-runtime/src/test/java/org/apache/asterix/runtime/operator/file/ADMDataParserTest.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/test/java/org/apache/asterix/runtime/operator/file/ADMDataParserTest.java b/asterix-runtime/src/test/java/org/apache/asterix/runtime/operator/file/ADMDataParserTest.java
deleted file mode 100644
index e23e255..0000000
--- a/asterix-runtime/src/test/java/org/apache/asterix/runtime/operator/file/ADMDataParserTest.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.runtime.operator.file;
-
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutput;
-import java.io.DataOutputStream;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.asterix.om.base.AMutableInterval;
-import org.apache.asterix.runtime.operators.file.ADMDataParser;
-import org.junit.Assert;
-import org.junit.Test;
-
-import junit.extensions.PA;
-
-public class ADMDataParserTest {
-
-    @Test
-    public void test() {
-        String[] dateIntervals = { "-9537-08-04, 9656-06-03", "-9537-04-04, 9656-06-04", "-9537-10-04, 9626-09-05" };
-        AMutableInterval[] parsedDateIntervals = new AMutableInterval[] {
-                new AMutableInterval(-4202630, 2807408, (byte) 17), new AMutableInterval(-4202752, 2807409, (byte) 17),
-                new AMutableInterval(-4202569, 2796544, (byte) 17), };
-
-        String[] timeIntervals = { "12:04:45.689Z, 12:41:59.002Z", "12:10:45.169Z, 15:37:48.736Z",
-                "04:16:42.321Z, 12:22:56.816Z" };
-        AMutableInterval[] parsedTimeIntervals = new AMutableInterval[] {
-                new AMutableInterval(43485689, 45719002, (byte) 18),
-                new AMutableInterval(43845169, 56268736, (byte) 18),
-                new AMutableInterval(15402321, 44576816, (byte) 18), };
-
-        String[] dateTimeIntervals = { "-2640-10-11T17:32:15.675Z, 4104-02-01T05:59:11.902Z",
-                "0534-12-08T08:20:31.487Z, 6778-02-16T22:40:21.653Z",
-                "2129-12-12T13:18:35.758Z, 8647-07-01T13:10:19.691Z" };
-        AMutableInterval[] parsedDateTimeIntervals = new AMutableInterval[] {
-                new AMutableInterval(-145452954464325L, 67345192751902L, (byte) 16),
-                new AMutableInterval(-45286270768513L, 151729886421653L, (byte) 16),
-                new AMutableInterval(5047449515758L, 210721439419691L, (byte) 16) };
-
-        Thread[] threads = new Thread[16];
-        AtomicInteger errorCount = new AtomicInteger(0);
-        for (int i = 0; i < threads.length; ++i) {
-            threads[i] = new Thread(new Runnable() {
-                ADMDataParser parser = new ADMDataParser();
-                ByteArrayOutputStream bos = new ByteArrayOutputStream();
-                DataOutput dos = new DataOutputStream(bos);
-
-                @Override
-                public void run() {
-                    try {
-                        int round = 0;
-                        while (round++ < 10000) {
-                            // Test parseDateInterval.
-                            for (int index = 0; index < dateIntervals.length; ++index) {
-                                PA.invokeMethod(parser, "parseDateInterval(java.lang.String, java.io.DataOutput)",
-                                        dateIntervals[index], dos);
-                                AMutableInterval aInterval = (AMutableInterval) PA.getValue(parser, "aInterval");
-                                Assert.assertTrue(aInterval.equals(parsedDateIntervals[index]));
-                            }
-
-                            // Tests parseTimeInterval.
-                            for (int index = 0; index < timeIntervals.length; ++index) {
-                                PA.invokeMethod(parser, "parseTimeInterval(java.lang.String, java.io.DataOutput)",
-                                        timeIntervals[index], dos);
-                                AMutableInterval aInterval = (AMutableInterval) PA.getValue(parser, "aInterval");
-                                Assert.assertTrue(aInterval.equals(parsedTimeIntervals[index]));
-                            }
-
-                            // Tests parseDateTimeInterval.
-                            for (int index = 0; index < dateTimeIntervals.length; ++index) {
-                                PA.invokeMethod(parser, "parseDateTimeInterval(java.lang.String, java.io.DataOutput)",
-                                        dateTimeIntervals[index], dos);
-                                AMutableInterval aInterval = (AMutableInterval) PA.getValue(parser, "aInterval");
-                                Assert.assertTrue(aInterval.equals(parsedDateTimeIntervals[index]));
-                            }
-                        }
-                    } catch (Exception e) {
-                        errorCount.incrementAndGet();
-                        e.printStackTrace();
-                    }
-                }
-            });
-            // Kicks off test threads.
-            threads[i].start();
-        }
-
-        // Joins all the threads.
-        try {
-            for (int i = 0; i < threads.length; ++i) {
-                threads[i].join();
-            }
-        } catch (InterruptedException e) {
-            throw new IllegalStateException(e);
-        }
-        // Asserts no failure.
-        Assert.assertTrue(errorCount.get() == 0);
-    }
-
-}