You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by im...@apache.org on 2015/05/01 04:38:04 UTC

[1/2] incubator-asterixdb-hyracks git commit: Allow retries on IPCSystem.getHandle().

Repository: incubator-asterixdb-hyracks
Updated Branches:
  refs/heads/master 4cc70185b -> ec8d7a2f3


Allow retries on IPCSystem.getHandle().

NC will retry indefinitely to connect CC.

Change-Id: I0f4c15cacd265c3fbe85307af9f5c33577035447
Reviewed-on: https://asterix-gerrit.ics.uci.edu/250
Reviewed-by: Chris Hillery <ce...@lambda.nu>
Tested-by: Chris Hillery <ce...@lambda.nu>


Project: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/commit/12bab0d3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/tree/12bab0d3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/diff/12bab0d3

Branch: refs/heads/master
Commit: 12bab0d36e2fe7031d8be1b2d58ff8bde0382ad8
Parents: 4cc7018
Author: Chris Hillery <ch...@lambda.nu>
Authored: Thu Apr 30 14:53:23 2015 -0700
Committer: Chris Hillery <ce...@lambda.nu>
Committed: Thu Apr 30 15:08:05 2015 -0700

----------------------------------------------------------------------
 .../control/nc/NodeControllerService.java       |  6 +-
 .../uci/ics/hyracks/ipc/impl/HandleState.java   |  1 +
 .../hyracks/ipc/impl/IPCConnectionManager.java  | 59 +++++++++++++++-----
 .../edu/uci/ics/hyracks/ipc/impl/IPCHandle.java |  5 +-
 .../edu/uci/ics/hyracks/ipc/impl/IPCSystem.java |  6 +-
 5 files changed, 54 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/12bab0d3/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
index 02cdd70..88c05be 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -22,9 +22,7 @@ import java.lang.management.MemoryUsage;
 import java.lang.management.OperatingSystemMXBean;
 import java.lang.management.RuntimeMXBean;
 import java.lang.management.ThreadMXBean;
-import java.net.InetAddress;
 import java.net.InetSocketAddress;
-import java.text.MessageFormat;
 import java.util.ArrayList;
 import java.util.Hashtable;
 import java.util.List;
@@ -37,8 +35,6 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.logging.Level;
 import java.util.logging.Logger;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
 
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.commons.lang3.mutable.MutableObject;
@@ -260,7 +256,7 @@ public class NodeControllerService extends AbstractRemoteService {
         init();
 
         datasetNetworkManager.start();
-        IIPCHandle ccIPCHandle = ipc.getHandle(new InetSocketAddress(ncConfig.ccHost, ncConfig.ccPort));
+        IIPCHandle ccIPCHandle = ipc.getHandle(new InetSocketAddress(ncConfig.ccHost, ncConfig.ccPort), -1);
         this.ccs = new ClusterControllerRemoteProxy(ccIPCHandle);
         HeartbeatSchema.GarbageCollectorInfo[] gcInfos = new HeartbeatSchema.GarbageCollectorInfo[gcMXBeans.size()];
         for (int i = 0; i < gcInfos.length; ++i) {

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/12bab0d3/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/HandleState.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/HandleState.java b/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/HandleState.java
index 8a5b979..d597741 100644
--- a/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/HandleState.java
+++ b/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/HandleState.java
@@ -19,5 +19,6 @@ enum HandleState {
     CONNECT_SENT,
     CONNECT_RECEIVED,
     CONNECTED,
+    CONNECT_FAILED,
     CLOSED,
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/12bab0d3/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCConnectionManager.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCConnectionManager.java b/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCConnectionManager.java
index 81294c2..00f1f46 100644
--- a/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCConnectionManager.java
+++ b/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCConnectionManager.java
@@ -87,18 +87,39 @@ public class IPCConnectionManager {
         serverSocketChannel.close();
     }
 
-    IPCHandle getIPCHandle(InetSocketAddress remoteAddress) throws IOException, InterruptedException {
+    IPCHandle getIPCHandle(InetSocketAddress remoteAddress, int retries) throws IOException, InterruptedException {
         IPCHandle handle;
-        synchronized (this) {
-            handle = ipcHandleMap.get(remoteAddress);
-            if (handle == null) {
-                handle = new IPCHandle(system, remoteAddress);
-                pendingConnections.add(handle);
-                networkThread.selector.wakeup();
+        int attempt = 1;
+        while (true) {
+            synchronized (this) {
+                handle = ipcHandleMap.get(remoteAddress);
+                if (handle == null) {
+                    handle = new IPCHandle(system, remoteAddress);
+                    pendingConnections.add(handle);
+                    networkThread.selector.wakeup();
+                }
+            }
+            if (handle.waitTillConnected()) {
+                return handle;
+            }
+            if (retries < 0) {
+                if (LOGGER.isLoggable(Level.INFO)) {
+                    LOGGER.info("Connection to " + remoteAddress + " failed, retrying...");
+                    attempt++;
+                    Thread.sleep(5000);
+                }
+            } else if (attempt < retries) {
+                if (LOGGER.isLoggable(Level.INFO)) {
+                    LOGGER.info("Connection to " + remoteAddress +
+                            " failed (Attempt " + attempt + " of " + retries + ")");
+                    attempt++;
+                    Thread.sleep(5000);
+                }
+            } else {
+                throw new IOException("Connection failed to " + remoteAddress);
             }
         }
-        handle.waitTillConnected();
-        return handle;
+
     }
 
     synchronized void registerHandle(IPCHandle handle) {
@@ -278,13 +299,21 @@ public class IPCConnectionManager {
                                 handle.setState(HandleState.CONNECT_RECEIVED);
                             } else if (key.isConnectable()) {
                                 SocketChannel channel = (SocketChannel) sc;
-                                if (channel.finishConnect()) {
-                                    IPCHandle handle = (IPCHandle) key.attachment();
-                                    handle.setState(HandleState.CONNECT_SENT);
-                                    registerHandle(handle);
-                                    key.interestOps(SelectionKey.OP_READ);
-                                    write(createInitialReqMessage(handle));
+                                IPCHandle handle = (IPCHandle) key.attachment();
+                                try {
+                                    if (!channel.finishConnect()) {
+                                        throw new Exception("Connection did not finish");
+                                    }
+                                }
+                                catch (Exception e) {
+                                    e.printStackTrace();
+                                    handle.setState(HandleState.CONNECT_FAILED);
+                                    continue;
                                 }
+                                handle.setState(HandleState.CONNECT_SENT);
+                                registerHandle(handle);
+                                key.interestOps(SelectionKey.OP_READ);
+                                write(createInitialReqMessage(handle));
                             }
                         }
                     }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/12bab0d3/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCHandle.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCHandle.java b/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCHandle.java
index 4908091..337440f 100644
--- a/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCHandle.java
+++ b/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCHandle.java
@@ -113,10 +113,11 @@ final class IPCHandle implements IIPCHandle {
         notifyAll();
     }
 
-    synchronized void waitTillConnected() throws InterruptedException {
-        while (!isConnected()) {
+    synchronized boolean waitTillConnected() throws InterruptedException {
+        while (state != HandleState.CONNECTED && state != HandleState.CONNECT_FAILED) {
             wait();
         }
+        return state == HandleState.CONNECTED;
     }
 
     ByteBuffer getInBuffer() {

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/12bab0d3/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCSystem.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCSystem.java b/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCSystem.java
index 9e7198b..35525c0 100644
--- a/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCSystem.java
+++ b/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCSystem.java
@@ -56,8 +56,12 @@ public class IPCSystem {
     }
 
     public IIPCHandle getHandle(InetSocketAddress remoteAddress) throws IPCException {
+        return getHandle(remoteAddress, 0);
+    }
+
+    public IIPCHandle getHandle(InetSocketAddress remoteAddress, int retries) throws IPCException {
         try {
-            return cMgr.getIPCHandle(remoteAddress);
+            return cMgr.getIPCHandle(remoteAddress, retries);
         } catch (IOException e) {
             throw new IPCException(e);
         } catch (InterruptedException e) {


[2/2] incubator-asterixdb-hyracks git commit: Issue 867: Handle delimited files using CR-only line separators

Posted by im...@apache.org.
Issue 867: Handle delimited files using CR-only line separators

Also simplify record- and field-counting logic.

Change-Id: Ie28abda93fc9e5996008fac8b60aaf906df49cb7
Reviewed-on: https://asterix-gerrit.ics.uci.edu/246
Reviewed-by: Ian Maxon <im...@uci.edu>
Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
Reviewed-by: Preston Carman <ec...@ucr.edu>


Project: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/commit/ec8d7a2f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/tree/ec8d7a2f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/diff/ec8d7a2f

Branch: refs/heads/master
Commit: ec8d7a2f30ae057674f6c3c92837cb9756955247
Parents: 12bab0d
Author: Chris Hillery <ch...@lambda.nu>
Authored: Thu Apr 30 17:03:35 2015 -0700
Committer: Chris Hillery <ce...@lambda.nu>
Committed: Thu Apr 30 19:27:14 2015 -0700

----------------------------------------------------------------------
 .../file/DelimitedDataTupleParserFactory.java   |  6 +----
 .../file/FieldCursorForDelimitedDataParser.java | 24 +++++++++++---------
 2 files changed, 14 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/ec8d7a2f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/DelimitedDataTupleParserFactory.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/DelimitedDataTupleParserFactory.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/DelimitedDataTupleParserFactory.java
index 6fd38d2..5be1eab 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/DelimitedDataTupleParserFactory.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/DelimitedDataTupleParserFactory.java
@@ -37,8 +37,6 @@ public class DelimitedDataTupleParserFactory implements ITupleParserFactory {
     private char fieldDelimiter;
     private char quote;
 
-    private int fieldCount;
-
     public DelimitedDataTupleParserFactory(IValueParserFactory[] fieldParserFactories, char fieldDelimiter) {
         this(fieldParserFactories, fieldDelimiter, '\"');
     }
@@ -47,7 +45,6 @@ public class DelimitedDataTupleParserFactory implements ITupleParserFactory {
         this.valueParserFactories = fieldParserFactories;
         this.fieldDelimiter = fieldDelimiter;
         this.quote = quote;
-        this.fieldCount = 0;
     }
 
     @Override
@@ -71,7 +68,7 @@ public class DelimitedDataTupleParserFactory implements ITupleParserFactory {
                     while (cursor.nextRecord()) {
                         tb.reset();
                         for (int i = 0; i < valueParsers.length; ++i) {
-                            if (!cursor.nextField(fieldCount)) {
+                            if (!cursor.nextField()) {
                                 break;
                             }
                             // Eliminate double quotes in the field that we are going to parse
@@ -82,7 +79,6 @@ public class DelimitedDataTupleParserFactory implements ITupleParserFactory {
                             }
                             valueParsers[i].parse(cursor.buffer, cursor.fStart, cursor.fEnd - cursor.fStart, dos);
                             tb.addFieldEndOffset();
-                            fieldCount++;
                         }
                         if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
                             FrameUtils.flushFrame(frame, writer);

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/ec8d7a2f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/FieldCursorForDelimitedDataParser.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/FieldCursorForDelimitedDataParser.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/FieldCursorForDelimitedDataParser.java
index 69ea0b1..780574c 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/FieldCursorForDelimitedDataParser.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/FieldCursorForDelimitedDataParser.java
@@ -32,7 +32,8 @@ public class FieldCursorForDelimitedDataParser {
     public char[] buffer;
     public int fStart;
     public int fEnd;
-    public int lineCount;
+    public int recordCount;
+    public int fieldCount;
     public int doubleQuoteCount;
     public boolean isDoubleQuoteIncludedInThisField;
 
@@ -69,10 +70,13 @@ public class FieldCursorForDelimitedDataParser {
         doubleQuoteCount = 0;
         startedQuote = false;
         isDoubleQuoteIncludedInThisField = false;
-        lineCount = 1;
+        recordCount = 0;
+        fieldCount = 0;
     }
 
     public boolean nextRecord() throws IOException {
+        recordCount++;
+        fieldCount = 0;
         while (true) {
             switch (state) {
                 case INIT:
@@ -119,12 +123,12 @@ public class FieldCursorForDelimitedDataParser {
                         } else if (ch == '\n' && !startedQuote) {
                             start = p + 1;
                             state = State.EOR;
-                            lineCount++;
                             lastDelimiterPosition = p;
                             break;
                         } else if (ch == '\r' && !startedQuote) {
                             start = p + 1;
                             state = State.CR;
+                            lastDelimiterPosition = p;
                             break;
                         }
                         ++p;
@@ -143,7 +147,6 @@ public class FieldCursorForDelimitedDataParser {
                     if (ch == '\n' && !startedQuote) {
                         ++start;
                         state = State.EOR;
-                        lineCount++;
                     } else {
                         state = State.IN_RECORD;
                         return true;
@@ -167,7 +170,8 @@ public class FieldCursorForDelimitedDataParser {
         }
     }
 
-    public boolean nextField(int fieldCount) throws IOException {
+    public boolean nextField() throws IOException {
+        fieldCount++;
         switch (state) {
             case INIT:
             case EOR:
@@ -217,10 +221,10 @@ public class FieldCursorForDelimitedDataParser {
                             } else {
                                 // In this case, we don't have a quote in the beginning of a field.
                                 throw new IOException(
-                                        "At line: "
-                                                + lineCount
+                                        "At record: "
+                                                + recordCount
                                                 + ", field#: "
-                                                + (fieldCount + 1)
+                                                + fieldCount
                                                 + " - a quote enclosing a field needs to be placed in the beginning of that field.");
                             }
                         }
@@ -262,7 +266,7 @@ public class FieldCursorForDelimitedDataParser {
                                 // There is a quote before the delimiter, however it is not directly placed before the delimiter.
                                 // In this case, we throw an exception.
                                 // quoteCount == doubleQuoteCount * 2 + 2 : only true when we have two quotes except double-quotes.
-                                throw new IOException("At line: " + lineCount + ", field#: " + (fieldCount + 1)
+                                throw new IOException("At record: " + recordCount + ", field#: " + fieldCount
                                         + " -  A quote enclosing a field needs to be followed by the delimiter.");
                             }
                         }
@@ -275,7 +279,6 @@ public class FieldCursorForDelimitedDataParser {
                             fEnd = p;
                             start = p + 1;
                             state = State.EOR;
-                            lineCount++;
                             lastDelimiterPosition = p;
                             return true;
                         } else if (startedQuote && lastQuotePosition == p - 1 && lastDoubleQuotePosition != p - 1
@@ -286,7 +289,6 @@ public class FieldCursorForDelimitedDataParser {
                             lastDelimiterPosition = p;
                             start = p + 1;
                             state = State.EOR;
-                            lineCount++;
                             startedQuote = false;
                             return true;
                         }