You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by am...@apache.org on 2016/01/03 18:41:01 UTC
[03/21] incubator-asterixdb git commit: First stage of external data
cleanup
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/CounterTimerTupleForwardPolicy.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/CounterTimerTupleForwardPolicy.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/CounterTimerTupleForwardPolicy.java
deleted file mode 100644
index c06caef..0000000
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/CounterTimerTupleForwardPolicy.java
+++ /dev/null
@@ -1,158 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.runtime.operators.file;
-
-import java.util.Map;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.common.parse.ITupleForwardPolicy;
-import org.apache.hyracks.api.comm.IFrame;
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.comm.VSizeFrame;
-import org.apache.hyracks.api.context.IHyracksCommonContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
-
-public class CounterTimerTupleForwardPolicy implements ITupleForwardPolicy {
-
- public static final String BATCH_SIZE = "batch-size";
- public static final String BATCH_INTERVAL = "batch-interval";
-
- private static final Logger LOGGER = Logger.getLogger(CounterTimerTupleForwardPolicy.class.getName());
-
- private FrameTupleAppender appender;
- private IFrame frame;
- private IFrameWriter writer;
- private int batchSize;
- private long batchInterval;
- private int tuplesInFrame = 0;
- private TimeBasedFlushTask flushTask;
- private Timer timer;
- private Object lock = new Object();
- private boolean activeTimer = false;
-
- public void configure(Map<String, String> configuration) {
- String propValue = (String) configuration.get(BATCH_SIZE);
- if (propValue != null) {
- batchSize = Integer.parseInt(propValue);
- } else {
- batchSize = -1;
- }
-
- propValue = (String) configuration.get(BATCH_INTERVAL);
- if (propValue != null) {
- batchInterval = Long.parseLong(propValue);
- activeTimer = true;
- }
- }
-
- public void initialize(IHyracksCommonContext ctx, IFrameWriter writer) throws HyracksDataException {
- this.appender = new FrameTupleAppender();
- this.frame = new VSizeFrame(ctx);
- appender.reset(frame, true);
- this.writer = writer;
- if (activeTimer) {
- this.timer = new Timer();
- this.flushTask = new TimeBasedFlushTask(writer, lock);
- timer.scheduleAtFixedRate(flushTask, 0, batchInterval);
- }
- }
-
- public void addTuple(ArrayTupleBuilder tb) throws HyracksDataException {
- if (activeTimer) {
- synchronized (lock) {
- addTupleToFrame(tb);
- }
- } else {
- addTupleToFrame(tb);
- }
- tuplesInFrame++;
- }
-
- private void addTupleToFrame(ArrayTupleBuilder tb) throws HyracksDataException {
- if (tuplesInFrame == batchSize || !appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("flushing frame containg (" + tuplesInFrame + ") tuples");
- }
- FrameUtils.flushFrame(frame.getBuffer(), writer);
- tuplesInFrame = 0;
- appender.reset(frame, true);
- if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
- throw new IllegalStateException();
- }
- }
- }
-
- public void close() throws HyracksDataException {
- if (appender.getTupleCount() > 0) {
- if (activeTimer) {
- synchronized (lock) {
- FrameUtils.flushFrame(frame.getBuffer(), writer);
- }
- } else {
- FrameUtils.flushFrame(frame.getBuffer(), writer);
- }
- }
-
- if (timer != null) {
- timer.cancel();
- }
- }
-
- private class TimeBasedFlushTask extends TimerTask {
-
- private IFrameWriter writer;
- private final Object lock;
-
- public TimeBasedFlushTask(IFrameWriter writer, Object lock) {
- this.writer = writer;
- this.lock = lock;
- }
-
- @Override
- public void run() {
- try {
- if (tuplesInFrame > 0) {
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("TTL expired flushing frame (" + tuplesInFrame + ")");
- }
- synchronized (lock) {
- FrameUtils.flushFrame(frame.getBuffer(), writer);
- appender.reset(frame, true);
- tuplesInFrame = 0;
- }
- }
- } catch (HyracksDataException e) {
- e.printStackTrace();
- }
- }
-
- }
-
- @Override
- public TupleForwardPolicyType getType() {
- return TupleForwardPolicyType.COUNTER_TIMER_EXPIRED;
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/DelimitedDataParser.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/DelimitedDataParser.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/DelimitedDataParser.java
deleted file mode 100644
index ca7c5c4..0000000
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/DelimitedDataParser.java
+++ /dev/null
@@ -1,176 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.runtime.operators.file;
-
-import java.io.DataOutput;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-
-import org.apache.asterix.builders.IARecordBuilder;
-import org.apache.asterix.builders.RecordBuilder;
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.dataflow.data.nontagged.serde.ANullSerializerDeserializer;
-import org.apache.asterix.om.base.AMutableString;
-import org.apache.asterix.om.base.ANull;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.asterix.om.types.ATypeTag;
-import org.apache.asterix.om.util.NonTaggedFormatUtil;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
-import org.apache.hyracks.dataflow.common.data.parsers.IValueParser;
-import org.apache.hyracks.dataflow.common.data.parsers.IValueParserFactory;
-import org.apache.hyracks.dataflow.std.file.FieldCursorForDelimitedDataParser;
-
-public class DelimitedDataParser extends AbstractDataParser implements IDataParser {
-
- protected final IValueParserFactory[] valueParserFactories;
- protected final char fieldDelimiter;
- protected final char quote;
- protected final boolean hasHeader;
- protected final ARecordType recordType;
- private IARecordBuilder recBuilder;
- private ArrayBackedValueStorage fieldValueBuffer;
- private DataOutput fieldValueBufferOutput;
- private IValueParser[] valueParsers;
- private FieldCursorForDelimitedDataParser cursor;
- private byte[] fieldTypeTags;
- private int[] fldIds;
- private ArrayBackedValueStorage[] nameBuffers;
- private boolean areAllNullFields;
-
- public DelimitedDataParser(ARecordType recordType, IValueParserFactory[] valueParserFactories, char fieldDelimter,
- char quote, boolean hasHeader) {
- this.recordType = recordType;
- this.valueParserFactories = valueParserFactories;
- this.fieldDelimiter = fieldDelimter;
- this.quote = quote;
- this.hasHeader = hasHeader;
- }
-
- @Override
- public void initialize(InputStream in, ARecordType recordType, boolean datasetRec) throws AsterixException,
- IOException {
-
- valueParsers = new IValueParser[valueParserFactories.length];
- for (int i = 0; i < valueParserFactories.length; ++i) {
- valueParsers[i] = valueParserFactories[i].createValueParser();
- }
-
- fieldValueBuffer = new ArrayBackedValueStorage();
- fieldValueBufferOutput = fieldValueBuffer.getDataOutput();
- recBuilder = new RecordBuilder();
- recBuilder.reset(recordType);
- recBuilder.init();
-
- int n = recordType.getFieldNames().length;
- fieldTypeTags = new byte[n];
- for (int i = 0; i < n; i++) {
- ATypeTag tag = recordType.getFieldTypes()[i].getTypeTag();
- fieldTypeTags[i] = tag.serialize();
- }
-
- fldIds = new int[n];
- nameBuffers = new ArrayBackedValueStorage[n];
- AMutableString str = new AMutableString(null);
- for (int i = 0; i < n; i++) {
- String name = recordType.getFieldNames()[i];
- fldIds[i] = recBuilder.getFieldId(name);
- if (fldIds[i] < 0) {
- if (!recordType.isOpen()) {
- throw new HyracksDataException("Illegal field " + name + " in closed type " + recordType);
- } else {
- nameBuffers[i] = new ArrayBackedValueStorage();
- fieldNameToBytes(name, str, nameBuffers[i]);
- }
- }
- }
-
- cursor = new FieldCursorForDelimitedDataParser(new InputStreamReader(in), fieldDelimiter, quote);
- }
-
- @Override
- public boolean parse(DataOutput out) throws AsterixException, IOException {
- if (hasHeader && cursor.recordCount == 0) {
- // Consume all fields of first record
- cursor.nextRecord();
- while (cursor.nextField());
- }
- while (cursor.nextRecord()) {
- recBuilder.reset(recordType);
- recBuilder.init();
- areAllNullFields = true;
-
- for (int i = 0; i < valueParsers.length; ++i) {
- if (!cursor.nextField()) {
- break;
- }
- fieldValueBuffer.reset();
-
- if (cursor.fStart == cursor.fEnd && recordType.getFieldTypes()[i].getTypeTag() != ATypeTag.STRING
- && recordType.getFieldTypes()[i].getTypeTag() != ATypeTag.NULL) {
- // if the field is empty and the type is optional, insert
- // NULL. Note that string type can also process empty field as an
- // empty string
- if (!NonTaggedFormatUtil.isOptional(recordType.getFieldTypes()[i])) {
- throw new AsterixException("At record: " + cursor.recordCount + " - Field " + cursor.fieldCount
- + " is not an optional type so it cannot accept null value. ");
- }
- fieldValueBufferOutput.writeByte(ATypeTag.NULL.serialize());
- ANullSerializerDeserializer.INSTANCE.serialize(ANull.NULL, out);
- } else {
- fieldValueBufferOutput.writeByte(fieldTypeTags[i]);
- // Eliminate doule quotes in the field that we are going to parse
- if (cursor.isDoubleQuoteIncludedInThisField) {
- cursor.eliminateDoubleQuote(cursor.buffer, cursor.fStart, cursor.fEnd - cursor.fStart);
- cursor.fEnd -= cursor.doubleQuoteCount;
- cursor.isDoubleQuoteIncludedInThisField = false;
- }
- valueParsers[i].parse(cursor.buffer, cursor.fStart, cursor.fEnd - cursor.fStart,
- fieldValueBufferOutput);
- areAllNullFields = false;
- }
- if (fldIds[i] < 0) {
- recBuilder.addField(nameBuffers[i], fieldValueBuffer);
- } else {
- recBuilder.addField(fldIds[i], fieldValueBuffer);
- }
- }
-
- if (!areAllNullFields) {
- recBuilder.write(out, true);
- return true;
- }
- }
- return false;
- }
-
- protected void fieldNameToBytes(String fieldName, AMutableString str, ArrayBackedValueStorage buffer)
- throws HyracksDataException {
- buffer.reset();
- DataOutput out = buffer.getDataOutput();
- str.setValue(fieldName);
- try {
- stringSerde.serialize(str, out);
- } catch (IOException e) {
- throw new HyracksDataException(e);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/FrameFullTupleForwardPolicy.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/FrameFullTupleForwardPolicy.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/FrameFullTupleForwardPolicy.java
deleted file mode 100644
index e22180c..0000000
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/FrameFullTupleForwardPolicy.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.runtime.operators.file;
-
-import java.util.Map;
-
-import org.apache.asterix.common.parse.ITupleForwardPolicy;
-import org.apache.hyracks.api.comm.IFrame;
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.comm.VSizeFrame;
-import org.apache.hyracks.api.context.IHyracksCommonContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
-
-public class FrameFullTupleForwardPolicy implements ITupleForwardPolicy {
-
- private FrameTupleAppender appender;
- private IFrame frame;
- private IFrameWriter writer;
-
- public void configure(Map<String, String> configuration) {
- // no-op
- }
-
- public void initialize(IHyracksCommonContext ctx, IFrameWriter writer)
- throws HyracksDataException {
- this.appender = new FrameTupleAppender();
- this.frame = new VSizeFrame(ctx);
- this.writer = writer;
- appender.reset(frame, true);
- }
-
- public void addTuple(ArrayTupleBuilder tb) throws HyracksDataException {
- boolean success = appender.append(tb.getFieldEndOffsets(),
- tb.getByteArray(), 0, tb.getSize());
- if (!success) {
- FrameUtils.flushFrame(frame.getBuffer(), writer);
- appender.reset(frame, true);
- success = appender.append(tb.getFieldEndOffsets(),
- tb.getByteArray(), 0, tb.getSize());
- if (!success) {
- throw new IllegalStateException();
- }
- }
- }
-
- public void close() throws HyracksDataException {
- if (appender.getTupleCount() > 0) {
- FrameUtils.flushFrame(frame.getBuffer(), writer);
- }
-
- }
-
- @Override
- public TupleForwardPolicyType getType() {
- return TupleForwardPolicyType.FRAME_FULL;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/IDataParser.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/IDataParser.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/IDataParser.java
deleted file mode 100644
index ba90e6c..0000000
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/IDataParser.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.runtime.operators.file;
-
-import java.io.DataOutput;
-import java.io.IOException;
-import java.io.InputStream;
-
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.om.types.ARecordType;
-
-/**
- * Interface implemented by a parser
- */
-public interface IDataParser {
-
- /**
- * Initialize the parser prior to actual parsing.
- *
- * @param in
- * input stream to be parsed
- * @param recordType
- * record type associated with input data
- * @param datasetRec
- * boolean flag set to true if input data represents dataset
- * records.
- * @throws AsterixException
- * @throws IOException
- */
- public void initialize(InputStream in, ARecordType recordType, boolean datasetRec) throws AsterixException,
- IOException;
-
- /**
- * Parse data from source input stream and output ADM records.
- *
- * @param out
- * DataOutput instance that for writing the parser output.
- * @return
- * @throws AsterixException
- * @throws IOException
- */
- public boolean parse(DataOutput out) throws AsterixException, IOException;
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/RateContolledParserPolicy.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/RateContolledParserPolicy.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/RateContolledParserPolicy.java
deleted file mode 100644
index 7b5d331..0000000
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/RateContolledParserPolicy.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.runtime.operators.file;
-
-import java.util.Map;
-
-import org.apache.asterix.common.parse.ITupleParserPolicy;
-import org.apache.hyracks.api.comm.IFrame;
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.comm.VSizeFrame;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
-
-public class RateContolledParserPolicy implements ITupleParserPolicy {
-
- protected FrameTupleAppender appender;
- protected IFrame frame;
- private IFrameWriter writer;
- private long interTupleInterval;
- private boolean delayConfigured;
-
- public static final String INTER_TUPLE_INTERVAL = "tuple-interval";
-
- public RateContolledParserPolicy() {
-
- }
-
- public TupleParserPolicy getType() {
- return ITupleParserPolicy.TupleParserPolicy.FRAME_FULL;
- }
-
-
- @Override
- public void addTuple(ArrayTupleBuilder tb) throws HyracksDataException {
- if (delayConfigured) {
- try {
- Thread.sleep(interTupleInterval);
- } catch (InterruptedException e) {
- throw new HyracksDataException(e);
- }
- }
- boolean success = appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize());
- if (!success) {
- FrameUtils.flushFrame(frame.getBuffer(), writer);
- appender.reset(frame, true);
- success = appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize());
- if (!success) {
- throw new IllegalStateException();
- }
- }
- appender.reset(frame, true);
- }
-
- @Override
- public void close() throws HyracksDataException {
- if (appender.getTupleCount() > 0) {
- FrameUtils.flushFrame(frame.getBuffer(), writer);
- }
- }
-
- @Override
- public void configure(Map<String, String> configuration) throws HyracksDataException {
- String propValue = configuration.get(INTER_TUPLE_INTERVAL);
- if (propValue != null) {
- interTupleInterval = Long.parseLong(propValue);
- } else {
- interTupleInterval = 0;
- }
- delayConfigured = interTupleInterval != 0;
-
- }
-
- @Override
- public void initialize(IHyracksTaskContext ctx, IFrameWriter writer) throws HyracksDataException {
- this.appender = new FrameTupleAppender();
- this.frame = new VSizeFrame(ctx);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/RateControlledTupleForwardPolicy.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/RateControlledTupleForwardPolicy.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/RateControlledTupleForwardPolicy.java
deleted file mode 100644
index c5af720..0000000
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/RateControlledTupleForwardPolicy.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.runtime.operators.file;
-
-import java.util.Map;
-
-import org.apache.asterix.common.parse.ITupleForwardPolicy;
-import org.apache.hyracks.api.comm.IFrame;
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.comm.VSizeFrame;
-import org.apache.hyracks.api.context.IHyracksCommonContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
-
-public class RateControlledTupleForwardPolicy implements ITupleForwardPolicy {
-
- private FrameTupleAppender appender;
- private IFrame frame;
- private IFrameWriter writer;
- private long interTupleInterval;
- private boolean delayConfigured;
-
- public static final String INTER_TUPLE_INTERVAL = "tuple-interval";
-
- public void configure(Map<String, String> configuration) {
- String propValue = configuration.get(INTER_TUPLE_INTERVAL);
- if (propValue != null) {
- interTupleInterval = Long.parseLong(propValue);
- }
- delayConfigured = interTupleInterval != 0;
- }
-
- public void initialize(IHyracksCommonContext ctx, IFrameWriter writer) throws HyracksDataException {
- this.appender = new FrameTupleAppender();
- this.frame = new VSizeFrame(ctx);
- this.writer = writer;
- appender.reset(frame, true);
- }
-
- public void addTuple(ArrayTupleBuilder tb) throws HyracksDataException {
- if (delayConfigured) {
- try {
- Thread.sleep(interTupleInterval);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- boolean success = appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize());
- if (!success) {
- FrameUtils.flushFrame(frame.getBuffer(), writer);
- appender.reset(frame, true);
- success = appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize());
- if (!success) {
- throw new IllegalStateException();
- }
- }
- }
-
- public void close() throws HyracksDataException {
- if (appender.getTupleCount() > 0) {
- FrameUtils.flushFrame(frame.getBuffer(), writer);
- }
-
- }
-
- @Override
- public TupleForwardPolicyType getType() {
- return TupleForwardPolicyType.RATE_CONTROLLED;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-runtime/src/main/resources/adm.grammar
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/resources/adm.grammar b/asterix-runtime/src/main/resources/adm.grammar
deleted file mode 100644
index 1910436..0000000
--- a/asterix-runtime/src/main/resources/adm.grammar
+++ /dev/null
@@ -1,86 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-# LEXER GENERATOR configuration file
-# ---------------------------------------
-# Place *first* the generic configuration
-# then list your grammar.
-
-PACKAGE: org.apache.asterix.runtime.operators.file.adm
-LEXER_NAME: AdmLexer
-
-TOKENS:
-
-BOOLEAN_CONS = string(boolean)
-INT8_CONS = string(int8)
-INT16_CONS = string(int16)
-INT32_CONS = string(int32)
-INT64_CONS = string(int64)
-INT64_CONS = string(int)
-FLOAT_CONS = string(float)
-DOUBLE_CONS = string(double)
-DATE_CONS = string(date)
-DATETIME_CONS = string(datetime)
-DURATION_CONS = string(duration)
-STRING_CONS = string(string)
-HEX_CONS = string(hex)
-BASE64_CONS = string(base64)
-POINT_CONS = string(point)
-POINT3D_CONS = string(point3d)
-LINE_CONS = string(line)
-POLYGON_CONS = string(polygon)
-RECTANGLE_CONS = string(rectangle)
-CIRCLE_CONS = string(circle)
-TIME_CONS = string(time)
-INTERVAL_TIME_CONS = string(interval-time)
-INTERVAL_DATE_CONS = string(interval-date)
-INTERVAL_DATETIME_CONS = string(interval-datetime)
-YEAR_MONTH_DURATION_CONS = string(year-month-duration)
-DAY_TIME_DURATION_CONS = string(day-time-duration)
-UUID_CONS = string(uuid)
-
-NULL_LITERAL = string(null)
-TRUE_LITERAL = string(true)
-FALSE_LITERAL = string(false)
-
-CONSTRUCTOR_OPEN = char(()
-CONSTRUCTOR_CLOSE = char())
-START_RECORD = char({)
-END_RECORD = char(})
-COMMA = char(\,)
-COLON = char(:)
-START_ORDERED_LIST = char([)
-END_ORDERED_LIST = char(])
-START_UNORDERED_LIST = string({{)
-# END_UNORDERED_LIST = }} is recognized as a double END_RECORD token
-
-STRING_LITERAL = char("), anythingUntil(")
-
-INT_LITERAL = signOrNothing(), digitSequence()
-INT8_LITERAL = token(INT_LITERAL), string(i8)
-INT16_LITERAL = token(INT_LITERAL), string(i16)
-INT32_LITERAL = token(INT_LITERAL), string(i32)
-INT64_LITERAL = token(INT_LITERAL), string(i64)
-
-@EXPONENT = caseInsensitiveChar(e), signOrNothing(), digitSequence()
-
-DOUBLE_LITERAL = signOrNothing(), char(.), digitSequence()
-DOUBLE_LITERAL = signOrNothing(), digitSequence(), char(.), digitSequence()
-DOUBLE_LITERAL = signOrNothing(), digitSequence(), char(.), digitSequence(), token(@EXPONENT)
-DOUBLE_LITERAL = signOrNothing(), digitSequence(), token(@EXPONENT)
-
-FLOAT_LITERAL = token(DOUBLE_LITERAL), caseInsensitiveChar(f)
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-runtime/src/test/java/org/apache/asterix/runtime/operator/file/ADMDataParserTest.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/test/java/org/apache/asterix/runtime/operator/file/ADMDataParserTest.java b/asterix-runtime/src/test/java/org/apache/asterix/runtime/operator/file/ADMDataParserTest.java
deleted file mode 100644
index e23e255..0000000
--- a/asterix-runtime/src/test/java/org/apache/asterix/runtime/operator/file/ADMDataParserTest.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.runtime.operator.file;
-
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutput;
-import java.io.DataOutputStream;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.asterix.om.base.AMutableInterval;
-import org.apache.asterix.runtime.operators.file.ADMDataParser;
-import org.junit.Assert;
-import org.junit.Test;
-
-import junit.extensions.PA;
-
-public class ADMDataParserTest {
-
- @Test
- public void test() {
- String[] dateIntervals = { "-9537-08-04, 9656-06-03", "-9537-04-04, 9656-06-04", "-9537-10-04, 9626-09-05" };
- AMutableInterval[] parsedDateIntervals = new AMutableInterval[] {
- new AMutableInterval(-4202630, 2807408, (byte) 17), new AMutableInterval(-4202752, 2807409, (byte) 17),
- new AMutableInterval(-4202569, 2796544, (byte) 17), };
-
- String[] timeIntervals = { "12:04:45.689Z, 12:41:59.002Z", "12:10:45.169Z, 15:37:48.736Z",
- "04:16:42.321Z, 12:22:56.816Z" };
- AMutableInterval[] parsedTimeIntervals = new AMutableInterval[] {
- new AMutableInterval(43485689, 45719002, (byte) 18),
- new AMutableInterval(43845169, 56268736, (byte) 18),
- new AMutableInterval(15402321, 44576816, (byte) 18), };
-
- String[] dateTimeIntervals = { "-2640-10-11T17:32:15.675Z, 4104-02-01T05:59:11.902Z",
- "0534-12-08T08:20:31.487Z, 6778-02-16T22:40:21.653Z",
- "2129-12-12T13:18:35.758Z, 8647-07-01T13:10:19.691Z" };
- AMutableInterval[] parsedDateTimeIntervals = new AMutableInterval[] {
- new AMutableInterval(-145452954464325L, 67345192751902L, (byte) 16),
- new AMutableInterval(-45286270768513L, 151729886421653L, (byte) 16),
- new AMutableInterval(5047449515758L, 210721439419691L, (byte) 16) };
-
- Thread[] threads = new Thread[16];
- AtomicInteger errorCount = new AtomicInteger(0);
- for (int i = 0; i < threads.length; ++i) {
- threads[i] = new Thread(new Runnable() {
- ADMDataParser parser = new ADMDataParser();
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- DataOutput dos = new DataOutputStream(bos);
-
- @Override
- public void run() {
- try {
- int round = 0;
- while (round++ < 10000) {
- // Test parseDateInterval.
- for (int index = 0; index < dateIntervals.length; ++index) {
- PA.invokeMethod(parser, "parseDateInterval(java.lang.String, java.io.DataOutput)",
- dateIntervals[index], dos);
- AMutableInterval aInterval = (AMutableInterval) PA.getValue(parser, "aInterval");
- Assert.assertTrue(aInterval.equals(parsedDateIntervals[index]));
- }
-
- // Tests parseTimeInterval.
- for (int index = 0; index < timeIntervals.length; ++index) {
- PA.invokeMethod(parser, "parseTimeInterval(java.lang.String, java.io.DataOutput)",
- timeIntervals[index], dos);
- AMutableInterval aInterval = (AMutableInterval) PA.getValue(parser, "aInterval");
- Assert.assertTrue(aInterval.equals(parsedTimeIntervals[index]));
- }
-
- // Tests parseDateTimeInterval.
- for (int index = 0; index < dateTimeIntervals.length; ++index) {
- PA.invokeMethod(parser, "parseDateTimeInterval(java.lang.String, java.io.DataOutput)",
- dateTimeIntervals[index], dos);
- AMutableInterval aInterval = (AMutableInterval) PA.getValue(parser, "aInterval");
- Assert.assertTrue(aInterval.equals(parsedDateTimeIntervals[index]));
- }
- }
- } catch (Exception e) {
- errorCount.incrementAndGet();
- e.printStackTrace();
- }
- }
- });
- // Kicks off test threads.
- threads[i].start();
- }
-
- // Joins all the threads.
- try {
- for (int i = 0; i < threads.length; ++i) {
- threads[i].join();
- }
- } catch (InterruptedException e) {
- throw new IllegalStateException(e);
- }
- // Asserts no failure.
- Assert.assertTrue(errorCount.get() == 0);
- }
-
-}