You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by im...@apache.org on 2015/05/01 04:38:04 UTC
[1/2] incubator-asterixdb-hyracks git commit: Allow retries on
IPCSystem.getHandle().
Repository: incubator-asterixdb-hyracks
Updated Branches:
refs/heads/master 4cc70185b -> ec8d7a2f3
Allow retries on IPCSystem.getHandle().
NC will retry indefinitely to connect CC.
Change-Id: I0f4c15cacd265c3fbe85307af9f5c33577035447
Reviewed-on: https://asterix-gerrit.ics.uci.edu/250
Reviewed-by: Chris Hillery <ce...@lambda.nu>
Tested-by: Chris Hillery <ce...@lambda.nu>
Project: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/commit/12bab0d3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/tree/12bab0d3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/diff/12bab0d3
Branch: refs/heads/master
Commit: 12bab0d36e2fe7031d8be1b2d58ff8bde0382ad8
Parents: 4cc7018
Author: Chris Hillery <ch...@lambda.nu>
Authored: Thu Apr 30 14:53:23 2015 -0700
Committer: Chris Hillery <ce...@lambda.nu>
Committed: Thu Apr 30 15:08:05 2015 -0700
----------------------------------------------------------------------
.../control/nc/NodeControllerService.java | 6 +-
.../uci/ics/hyracks/ipc/impl/HandleState.java | 1 +
.../hyracks/ipc/impl/IPCConnectionManager.java | 59 +++++++++++++++-----
.../edu/uci/ics/hyracks/ipc/impl/IPCHandle.java | 5 +-
.../edu/uci/ics/hyracks/ipc/impl/IPCSystem.java | 6 +-
5 files changed, 54 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/12bab0d3/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
index 02cdd70..88c05be 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -22,9 +22,7 @@ import java.lang.management.MemoryUsage;
import java.lang.management.OperatingSystemMXBean;
import java.lang.management.RuntimeMXBean;
import java.lang.management.ThreadMXBean;
-import java.net.InetAddress;
import java.net.InetSocketAddress;
-import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Hashtable;
import java.util.List;
@@ -37,8 +35,6 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.logging.Level;
import java.util.logging.Logger;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
import org.apache.commons.lang3.mutable.Mutable;
import org.apache.commons.lang3.mutable.MutableObject;
@@ -260,7 +256,7 @@ public class NodeControllerService extends AbstractRemoteService {
init();
datasetNetworkManager.start();
- IIPCHandle ccIPCHandle = ipc.getHandle(new InetSocketAddress(ncConfig.ccHost, ncConfig.ccPort));
+ IIPCHandle ccIPCHandle = ipc.getHandle(new InetSocketAddress(ncConfig.ccHost, ncConfig.ccPort), -1);
this.ccs = new ClusterControllerRemoteProxy(ccIPCHandle);
HeartbeatSchema.GarbageCollectorInfo[] gcInfos = new HeartbeatSchema.GarbageCollectorInfo[gcMXBeans.size()];
for (int i = 0; i < gcInfos.length; ++i) {
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/12bab0d3/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/HandleState.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/HandleState.java b/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/HandleState.java
index 8a5b979..d597741 100644
--- a/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/HandleState.java
+++ b/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/HandleState.java
@@ -19,5 +19,6 @@ enum HandleState {
CONNECT_SENT,
CONNECT_RECEIVED,
CONNECTED,
+ CONNECT_FAILED,
CLOSED,
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/12bab0d3/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCConnectionManager.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCConnectionManager.java b/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCConnectionManager.java
index 81294c2..00f1f46 100644
--- a/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCConnectionManager.java
+++ b/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCConnectionManager.java
@@ -87,18 +87,39 @@ public class IPCConnectionManager {
serverSocketChannel.close();
}
- IPCHandle getIPCHandle(InetSocketAddress remoteAddress) throws IOException, InterruptedException {
+ IPCHandle getIPCHandle(InetSocketAddress remoteAddress, int retries) throws IOException, InterruptedException {
IPCHandle handle;
- synchronized (this) {
- handle = ipcHandleMap.get(remoteAddress);
- if (handle == null) {
- handle = new IPCHandle(system, remoteAddress);
- pendingConnections.add(handle);
- networkThread.selector.wakeup();
+ int attempt = 1;
+ while (true) {
+ synchronized (this) {
+ handle = ipcHandleMap.get(remoteAddress);
+ if (handle == null) {
+ handle = new IPCHandle(system, remoteAddress);
+ pendingConnections.add(handle);
+ networkThread.selector.wakeup();
+ }
+ }
+ if (handle.waitTillConnected()) {
+ return handle;
+ }
+ if (retries < 0) {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Connection to " + remoteAddress + " failed, retrying...");
+ attempt++;
+ Thread.sleep(5000);
+ }
+ } else if (attempt < retries) {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Connection to " + remoteAddress +
+ " failed (Attempt " + attempt + " of " + retries + ")");
+ attempt++;
+ Thread.sleep(5000);
+ }
+ } else {
+ throw new IOException("Connection failed to " + remoteAddress);
}
}
- handle.waitTillConnected();
- return handle;
+
}
synchronized void registerHandle(IPCHandle handle) {
@@ -278,13 +299,21 @@ public class IPCConnectionManager {
handle.setState(HandleState.CONNECT_RECEIVED);
} else if (key.isConnectable()) {
SocketChannel channel = (SocketChannel) sc;
- if (channel.finishConnect()) {
- IPCHandle handle = (IPCHandle) key.attachment();
- handle.setState(HandleState.CONNECT_SENT);
- registerHandle(handle);
- key.interestOps(SelectionKey.OP_READ);
- write(createInitialReqMessage(handle));
+ IPCHandle handle = (IPCHandle) key.attachment();
+ try {
+ if (!channel.finishConnect()) {
+ throw new Exception("Connection did not finish");
+ }
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ handle.setState(HandleState.CONNECT_FAILED);
+ continue;
}
+ handle.setState(HandleState.CONNECT_SENT);
+ registerHandle(handle);
+ key.interestOps(SelectionKey.OP_READ);
+ write(createInitialReqMessage(handle));
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/12bab0d3/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCHandle.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCHandle.java b/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCHandle.java
index 4908091..337440f 100644
--- a/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCHandle.java
+++ b/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCHandle.java
@@ -113,10 +113,11 @@ final class IPCHandle implements IIPCHandle {
notifyAll();
}
- synchronized void waitTillConnected() throws InterruptedException {
- while (!isConnected()) {
+ synchronized boolean waitTillConnected() throws InterruptedException {
+ while (state != HandleState.CONNECTED && state != HandleState.CONNECT_FAILED) {
wait();
}
+ return state == HandleState.CONNECTED;
}
ByteBuffer getInBuffer() {
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/12bab0d3/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCSystem.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCSystem.java b/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCSystem.java
index 9e7198b..35525c0 100644
--- a/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCSystem.java
+++ b/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCSystem.java
@@ -56,8 +56,12 @@ public class IPCSystem {
}
public IIPCHandle getHandle(InetSocketAddress remoteAddress) throws IPCException {
+ return getHandle(remoteAddress, 0);
+ }
+
+ public IIPCHandle getHandle(InetSocketAddress remoteAddress, int retries) throws IPCException {
try {
- return cMgr.getIPCHandle(remoteAddress);
+ return cMgr.getIPCHandle(remoteAddress, retries);
} catch (IOException e) {
throw new IPCException(e);
} catch (InterruptedException e) {
[2/2] incubator-asterixdb-hyracks git commit: Issue 867: Handle
delimited files using CR-only line separators
Posted by im...@apache.org.
Issue 867: Handle delimited files using CR-only line separators
Also simplify record- and field-counting logic.
Change-Id: Ie28abda93fc9e5996008fac8b60aaf906df49cb7
Reviewed-on: https://asterix-gerrit.ics.uci.edu/246
Reviewed-by: Ian Maxon <im...@uci.edu>
Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
Reviewed-by: Preston Carman <ec...@ucr.edu>
Project: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/commit/ec8d7a2f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/tree/ec8d7a2f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/diff/ec8d7a2f
Branch: refs/heads/master
Commit: ec8d7a2f30ae057674f6c3c92837cb9756955247
Parents: 12bab0d
Author: Chris Hillery <ch...@lambda.nu>
Authored: Thu Apr 30 17:03:35 2015 -0700
Committer: Chris Hillery <ce...@lambda.nu>
Committed: Thu Apr 30 19:27:14 2015 -0700
----------------------------------------------------------------------
.../file/DelimitedDataTupleParserFactory.java | 6 +----
.../file/FieldCursorForDelimitedDataParser.java | 24 +++++++++++---------
2 files changed, 14 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/ec8d7a2f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/DelimitedDataTupleParserFactory.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/DelimitedDataTupleParserFactory.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/DelimitedDataTupleParserFactory.java
index 6fd38d2..5be1eab 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/DelimitedDataTupleParserFactory.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/DelimitedDataTupleParserFactory.java
@@ -37,8 +37,6 @@ public class DelimitedDataTupleParserFactory implements ITupleParserFactory {
private char fieldDelimiter;
private char quote;
- private int fieldCount;
-
public DelimitedDataTupleParserFactory(IValueParserFactory[] fieldParserFactories, char fieldDelimiter) {
this(fieldParserFactories, fieldDelimiter, '\"');
}
@@ -47,7 +45,6 @@ public class DelimitedDataTupleParserFactory implements ITupleParserFactory {
this.valueParserFactories = fieldParserFactories;
this.fieldDelimiter = fieldDelimiter;
this.quote = quote;
- this.fieldCount = 0;
}
@Override
@@ -71,7 +68,7 @@ public class DelimitedDataTupleParserFactory implements ITupleParserFactory {
while (cursor.nextRecord()) {
tb.reset();
for (int i = 0; i < valueParsers.length; ++i) {
- if (!cursor.nextField(fieldCount)) {
+ if (!cursor.nextField()) {
break;
}
// Eliminate double quotes in the field that we are going to parse
@@ -82,7 +79,6 @@ public class DelimitedDataTupleParserFactory implements ITupleParserFactory {
}
valueParsers[i].parse(cursor.buffer, cursor.fStart, cursor.fEnd - cursor.fStart, dos);
tb.addFieldEndOffset();
- fieldCount++;
}
if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
FrameUtils.flushFrame(frame, writer);
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/ec8d7a2f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/FieldCursorForDelimitedDataParser.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/FieldCursorForDelimitedDataParser.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/FieldCursorForDelimitedDataParser.java
index 69ea0b1..780574c 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/FieldCursorForDelimitedDataParser.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/FieldCursorForDelimitedDataParser.java
@@ -32,7 +32,8 @@ public class FieldCursorForDelimitedDataParser {
public char[] buffer;
public int fStart;
public int fEnd;
- public int lineCount;
+ public int recordCount;
+ public int fieldCount;
public int doubleQuoteCount;
public boolean isDoubleQuoteIncludedInThisField;
@@ -69,10 +70,13 @@ public class FieldCursorForDelimitedDataParser {
doubleQuoteCount = 0;
startedQuote = false;
isDoubleQuoteIncludedInThisField = false;
- lineCount = 1;
+ recordCount = 0;
+ fieldCount = 0;
}
public boolean nextRecord() throws IOException {
+ recordCount++;
+ fieldCount = 0;
while (true) {
switch (state) {
case INIT:
@@ -119,12 +123,12 @@ public class FieldCursorForDelimitedDataParser {
} else if (ch == '\n' && !startedQuote) {
start = p + 1;
state = State.EOR;
- lineCount++;
lastDelimiterPosition = p;
break;
} else if (ch == '\r' && !startedQuote) {
start = p + 1;
state = State.CR;
+ lastDelimiterPosition = p;
break;
}
++p;
@@ -143,7 +147,6 @@ public class FieldCursorForDelimitedDataParser {
if (ch == '\n' && !startedQuote) {
++start;
state = State.EOR;
- lineCount++;
} else {
state = State.IN_RECORD;
return true;
@@ -167,7 +170,8 @@ public class FieldCursorForDelimitedDataParser {
}
}
- public boolean nextField(int fieldCount) throws IOException {
+ public boolean nextField() throws IOException {
+ fieldCount++;
switch (state) {
case INIT:
case EOR:
@@ -217,10 +221,10 @@ public class FieldCursorForDelimitedDataParser {
} else {
// In this case, we don't have a quote in the beginning of a field.
throw new IOException(
- "At line: "
- + lineCount
+ "At record: "
+ + recordCount
+ ", field#: "
- + (fieldCount + 1)
+ + fieldCount
+ " - a quote enclosing a field needs to be placed in the beginning of that field.");
}
}
@@ -262,7 +266,7 @@ public class FieldCursorForDelimitedDataParser {
// There is a quote before the delimiter, however it is not directly placed before the delimiter.
// In this case, we throw an exception.
// quoteCount == doubleQuoteCount * 2 + 2 : only true when we have two quotes except double-quotes.
- throw new IOException("At line: " + lineCount + ", field#: " + (fieldCount + 1)
+ throw new IOException("At record: " + recordCount + ", field#: " + fieldCount
+ " - A quote enclosing a field needs to be followed by the delimiter.");
}
}
@@ -275,7 +279,6 @@ public class FieldCursorForDelimitedDataParser {
fEnd = p;
start = p + 1;
state = State.EOR;
- lineCount++;
lastDelimiterPosition = p;
return true;
} else if (startedQuote && lastQuotePosition == p - 1 && lastDoubleQuotePosition != p - 1
@@ -286,7 +289,6 @@ public class FieldCursorForDelimitedDataParser {
lastDelimiterPosition = p;
start = p + 1;
state = State.EOR;
- lineCount++;
startedQuote = false;
return true;
}